- MPI 跨节点并行,并行度高
- 分布式系统
pThread 接近底层,灵活
- 提供虚拟地编写任何可知线程行为的能力
- 共享内存
openmp 隐藏底层细节
- 允许编译器和运行时系统决定线程行为的一些细节
- 使用openmp编写一些并行行为更容易
- 很难对一些底层的线程交互进行编程
- 共享内存
openmp
- 特征:
- 任务并行化
- 显式线程同步
编译选项(打开编译器支持) :
-fopenmp
头文件 :
#include <omp.h>
最基本的parallel指令 :
#pragma omp parallel
- 表明之后的结构化代码块(structured block, 基本块)应被多个线程并行执行
- 一个结构化代码块是一条C语句或者只有一个入口和一个出口的一组复合C语句,代码中允许调用exit函数
- (简单地禁止分支语句或离开结构化代码块)
线程(thread) : 执行线程(thread of execution)
- 共享派生(fork)它们的进行的大部分资源(如标准输入输出)
- 每个线程有自己的栈和计数器
- 完成执行后合并(join)到启动它的进程
指定线程数 :
#pragma omp parallel num_threads(thread_count)
num_threads
子句,其中thread_count为并发执行的线程数
- note : 程序可以启动的线程数可能受系统定义的限制,openmp不保证实际能够启动thread_count个线程(大部分系统能够启动数百、数千个线程)
程序开始执行时,进程启动
程序到达parallel指令时,原线程继续执行,启动另外thread_count - 1
个线程
- 原线程为主线程(master),额外的线程为从线程(slave)
执行并行块的线程集合(主线程 和 从线程)为线程组
隐式路障 : 完成代码块的线程将等待线程组中的所有其他线程完成代码块
- 之后从线程终止,主线程继续执行之后的(非并行)代码
获取线程编号 :
int omp_get_thread_num(void);
- [0, 1, …, thread_count-1]
获取线程组中的线程数 :
int omp_get_num_threads(void);
错误检查 : 检查编译器对OpenMP的支持
- 不支持OpenMP的编译器只忽略
parallel
指令
- 不支持OpenMP的编译器只忽略
|
- 临界区 :
#pragma omp critical
- 临界区 : 一个被多个更新共享资源的线程执行的代码(块),一次只能被一个线程更新
- 竞争条件(race condition) : 多个线程试图访问一个共享资源,并且至少其中一个访问是更新该共享资源(可能会导致错误)
- 将引起竞争条件的代码声明为临界区
一个能够被线程组中的所有线程访问的变量拥有共享作用域,一个只能被单个线程访问的变量拥有私有作用域
- 在parallel块中声明的变量在线程的(私有)栈中分配,都是私有变量(拥有私有作用域)
- 在parallel块之前声明的变量的缺省作用域是共享的
归约子句 :
reduction(<operator> : <variable list>)
- 如 :
#pragma omp parallel reduction(+ : global_result)
- 如 :
operator
:+
,*
,&
,|
,^
,&&
,||
之一
- 即 归约操作符,满足交换律和结合律的二元运算符(减法不满足,需将减法转成加法)
variable list
: 归约变量
归约就是将相同的归约操作符重复地应用到操作数序列得到一个结果的运算
所有操作的中间结果存储在同一个变量(归约变量, reduction variable)中note : 浮点型数据的运算不满足结合律(截断误差的影响)
- 循环并行化 :
#pragma omp parallel for
- 结构化代码块必须是for循环
- 系统通过在线程间划分循环迭代并行化for循环
- 线程间的缺省划分方式是由系统决定的,大部分粗略地使用块划分
- 和parallel指令的不同 :
- (1)parallel的块的具体工作一般须线程本身在线程之间划分
- (2)parallel指令之前声明的变量缺省作用域是共享的,parallel for指令循环变量的缺省作用域是私有的
warning :
OpenMP只会并行化for循环,不会并行化while或do-while循环(需转成等效的for循环)
OpenMP只能并行化在以下情况下确定迭代次数的for循环
(1)由for语句本身(for (...;...;...)
)确定
(2)在循环之前确定
— 无限循环不能并行化
— 有break语句的循环不能并行化(break添加了另一个从循环退出的出口)
OpenMP只能并行化具有典型结构的for循环
(1)变量index必须是整型或指针类型(非float型浮点数)
(2)表达式start、end、incr必须有一个兼容的类型
(3)表达式start、end、incr不能在循环执行期间改变
(4)在循环期间变量index只能够被for语句的增量表达式修改note : 循环体中可以有一个exit调用
- 数据依赖性 : 循环中,迭代中的计算依赖于一个或多个先前的迭代结果
- OpenMP编译器不检查被parallel for指令并行化的循环所包含的迭代间的依赖关系(由程序员识别)
- 一个或多个迭代结果依赖于其它迭代的循环,一般不能被OpenMP正确地并行化
循环依赖(loop-carried dependence)
注意缺省情况下任何在循环前声明的变量(除了例外的循环变量)在线程间都是共享的不可并行化的数据依赖:Read-after-Write、Write-after-Write
- private子句 :
private(<variable list>)
- 在private子句内列举的变量在每个线程上都有一个私有副本被创建
note : 一个私有作用域的变量的值在parallel块或者parallel for块开始处是未指定的,在parallel块或parallel for块完成之后也是未指定的
例如一个在块之前声明并初始化的变量,在private子句声明为私有作用域的块中一开始的值是非确定的
- shared子句 :
shared(<variable list>)
- 明确变量的作用域为共享的
default(none)
: 显式要求变量的作用域显式指定- 编译器将要求程序员明确在这个块中使用的每个变量和已经在块之外声明的变量的作用域
对于并行化for循环而言,parallel for指令不是一个通用的解决方案(数据依赖/循环依赖)
- 循环调度 : 各次循环分配给线程的操作(操作系统完成)
- 大部分OpenMP实现粗略地使用块分割 : 如果在串行循环中有n次迭代,那么在并行循环中,前
n/thread_count
个迭代分配给线程0,接下来的n/thread_count
个迭代分配给线程1,以此类推
- 大部分OpenMP实现粗略地使用块分割 : 如果在串行循环中有n次迭代,那么在并行循环中,前
- 不是最优的分配方式 : 如果循环首尾的运算量不同,会造成负载不均衡
- 适合循环划分(轮流分配线程的工作):各次迭代被轮流地一次一个地分配给线程
schedule子句 :
schedule(<type>[, <chunksize>])
- type :
- (1)static : 迭代在循环执行前分配给线程(循环划分)
- (2)dynamic/guided : 迭代在循环执行时分配给线程,在一个线程完成了当前迭代集合后能从运行时系统中请求更多
- (3)auto : 编译器和运行时系统决定调度方式
- (4)runtime : 调度在运行时决定
- chunkszie : 迭代块大小,正整数(迭代块是在顺序循环中连续执行的一块迭代语句)
static调度类型 : 系统以轮转的方式分配chunksize块个迭代给每个线程
- 例如,12个迭代,三个线程
schedule(static, 1)
- Thread 0: 0, 3, 6, 9
- Thread 1: 1, 4, 7, 10
- Thread 2: 2, 5, 8, 11
schedule(static, 2)
- Thread 0: 0, 1, 6, 7
- Thread 1: 2, 3, 8, 9
- Thread 2: 4, 5, 10, 11
schedule(static, total_iterations/thread_count)
相当于大部分OpenMP实现所使用的缺省调度
note : 块大小调整至并行时间最短(均衡点)
- dynamic调度类型
- 迭代被分成块大小为chunksize连续的块
- 每个线程执行一块,当一个线程完成一块时它将从运行时系统请求另一块,直到所有的迭代完成
- guided调度类型
- 迭代分成块,块大小近似等于剩下的迭代数除以线程数(当块完成后新块的大小会变小)
- 若无指定chunksize,则块的大小最小为1;
- 若指定chunksize,则块的最小大小为chunksize,除了最后一块的大小可以比chunksize小
runtim调度类型
- 调度类型由环境变量
OMP_SCHEDULE
指定线程数
- 调度类型由环境变量
$export OMP_SCHEDULE="static, 1"
- 可以通过改变环境变量的值而不用更改源代码达到改变调度类型的目的
调度选择 : 平衡调度开销和负载均衡
(1)如果循环的每次迭代需要几乎相同的计算量,可能默认的调度方式能提供最好的性能
(2)如果随着循环的进行,迭代的计算量线性递增/减,采用较小的chunksize的static调度可能提供最好的性能
(3)如果每次迭代的开销不能事先确定,需要尝试多种不同的调度策略:使用schedule(runtime)
子句,赋予环境变量OMP_SCHEDULE
不同的值,比较不同调度策略下程序的性能
生产者和消费者问题
不适合用parallel for指令或者for指令并行化的问题
- 使用的数据结构 : 队列
- 生产者线程产生对服务器数据的请求,将请求入队
- 消费者线程发现和生成数据消费请求,将请求出队
应用 : 共享内存系统上实现消息传递
每一个线程有一个共享消息队列,当一个线程要向另一个线程“发送消息”时,将消息放入目标线程的消息队列
一个线程接收消息只需从它的消息队列的头部取出消息
|
|
- 显式路障 :
#pragma omp barrier
- 线程遇到路障时被阻塞,直到组中所有线程都到达了这个路障,才能继续往下执行
原子操作 :
#pragma omp atomic
- 保护一条C语言赋值语句所形成的临界区
- 修改相同变量的不同语句被视为同一临界区
x <op>= <expression>
;
x++;
x--;
++x;
--x;
: +
,*
,-
,/
,&
,^
,|
,<<
,>>
note :
不能引用x
只有x的装载和存储确保受保护,对于中含有的不受保护的变量的更新,会使程序结果不可预测
如为 x += y++;
- 命名临界区 :
#pragma omp critical(name)
- OpenMP默认的做法是将所有的临界区代码块作为复合临界区的一部分,可能非常不利于程序的性能
- 强制线程间的互斥会使程序的执行串行化
- 两个用不同名字而定critical指令保护的代码块就可以同时执行
锁(lock) : 一个数据结构和定义在其上的函数组成
- 可以显式地强制对临界区进行互斥访问
- 一个线程进入临界区前尝试调用锁函数上锁(set)
- 如果没有其他的线程正在执行临界区的代码,则此线程获得锁并进入临界区
- 该线程执行完临界区的代码后调用解锁函数释放(unset/relinquish)锁,以便其他线程可以获得锁
- 当一个线程拥有锁时其他线程都不能进入该临界区
|
|
- OpenMP有两种锁 : 简单(simple)锁 和 嵌套(nested)锁
- 简单锁在被释放前只能获得一次
- 嵌套锁在被释放前可以被同一个线程获得多次
|
|
atomic指令是实现互斥访问最快的访问(处理器专门的装载-修改-存储
指令)
OpenMP规范允许atomic指令对程序中所有的atomic指令标记的临界区进行强制互斥访问(类似未命名的critical指令),可能导致不必要的互斥(有些实现把两条修改不同变量的被atomic指令保护的语句视为相同的临界区)
此时可以选择命名的critical指令或者锁
- 互斥可能会引起一些严重的编程问题
- 对同一临界区混用不同的机制不能保证互斥执行
- 互斥不一定保证公平性,某线程可能一直阻塞等待对某个临界区的执行(饥饿现象)
- 嵌套互斥结构或线程以不同的顺序进入多个临界区可能会产生死锁
缓存、缓存一致性、伪共享
- 缓存(cache):解决主存速度比处理器慢而拖慢计算速度的问题
- 原理:时间和空间局部性
- 结构:缓存行和缓存块
现代的微处理器架构使用缓存以减少主存访问时间
典型的体系结构都有专门的硬件确保在不同的处理器芯片上的缓存是一致的
写缺失(write-miss):核试图修改不在缓存中的变量时发生
- 内核必须访问主存
读缺失(read-miss):核试图读取不在缓存中的变量时发生
- 内核必须访问主存
伪共享:两个线程可能访问内存中的不同位置,但是当这两个位置属于同一个缓存行时缓存一致性硬件所表现出来的处理方式就好像这两个线程访问的是内存中的同一个位置
- 如果其中一个线程更新了它所访问的主存地址的值,则另外一个变量试图读取它要访问的主存地址时不得不从主存获取该值
- 硬件强制该线程表现得好像它共享了变量(实际上没有共享任何变量)
- 会大大降低共享内存程序的性能
两种可行的避免伪共享的解决方案
1.用伪变量填充以确保任意一个线程的更新不会影响其他线程的缓存行
2.每个线程在迭代期间使用私有存储,在计算完成后更新共享存储
线程安全性
一个代码块被多个线程同时执行时不会产生错误,则是线程安全的
某些C函数通过声明static存储类型的变量存储输入行,会导致存储在此变量中的值从上一个调用保留到下一个调用;这个变量是共享的,一个线程可以写覆盖另外一个线程的数据
- 这些函数不是线程安全的
pthread
共享内存编程:
不同的处理器尝试更新共享内存区域上同一位置的数据会导致共享区域的内容无法预测。
更新共享区域内存的代码段:临界区(critical section)
- 进程
- 可执行代码
- 栈段
- 堆段
- 资源描述符(操作系统分配,如文件描述符)
- 安全信息(如进程被允许访问的软硬件资源)
- 进程状态信息(运行/等待资源、寄存器/程序计数器值)
大多数操作系统默认状态下一个进程的内存块是私有的
- 其他进程无法绕过OS直接访问
线程:轻量级进程
POSIX线程库:Pthreads
链接Pthreads线程库:
-lpthread
- 某些系统无需添加链接选项编译器会自动链接
- 如:
$gcc src.c -o exec.out -lpthread
- 如:
源代码包含Pthreads线程库头文件:
pthread.h
全局变量为所有线程所共享
启动线程
pthread_t
数据结构存储线程的专有信息- pthread_t对象是不透明对象,其中的数据由OS绑定,用户级代码无法直接访问
- pthread_t对象是线程的唯一标识
pthread_create
函数生成线程
|
|
- 第一个参数为
pthread_t
指针,需在调用前就分配好内存空间
- 第一个参数为
- 第二个参数忽略,
NULL
- 第二个参数忽略,
- 第三个参数为线程函数
- 第四个参数为传递给线程函数的参数
- 返回值用于表示线程调用过程是否正确
线程函数
void* thread_function(void* args_p);
- 常用参数传递方式,包含在
struct thread_data_t
中
- 常用参数传递方式,包含在
- 线程函数通过
thread_data_t *my_data = (thread_data_t*)args_p;
获取参数
- 线程函数通过
参数转换成
void*
型指针再传进去
“单程序,多数据”并行模式
不同线程可以指定不同线程函数运行
运行线程
- 主线程运行main函数
Pthreads不允许程序员控制线程在哪个核上运行(允许指定核心的实现是不可移植的)
线程调度有OS指定
停止线程
pthread_join
函数等待pthread_t对象关联的线程结束
|
|
- 第二个参数接受任意由pthread_t对象所关联的线程产生的返回值
- 通常的接受返回值方法,包含在
struct thread_result
中
- 通常的接受返回值方法,包含在
- 线程函数返回该结构类型的对象的指针
临界区
- 竞争条件(race condition):
- 多个线程都要访问共享变量或共享文件等共享资源时
- 至少一个访问是更新操作
- 这些访问就可能会导致某种错误
忙等待
- 使用共享的标志变量阻塞防止多个线程进入临界区
|
|
- 在忙等待中线程不停地测试某个条件
- 在某个条件满足之前测试都是徒劳的
忙等待有效的前提是严格按照书写顺写执行代码
有些编译器优化会破坏这个条件
- 忙等待不是控制临界区最好的方法
- 浪费CPU周期不停进行循环条件测试
- 对性能可能有极大影响
- 关闭编译器优化会降低性能
互斥量
- mutex : mutually exclusive
处于忙等待状态的线程持续使用CPU造成资源浪费
- 互斥量 : 互斥锁
- 特殊类型的变量
- 通过特殊类型的函数
- 用来限制每次只有一个线程能进入临界区
Pthreads标准为互斥量提供的类型:
pthread_mutex_t
以及一些相关操作:
- 初始化(变量使用前):
|
|
- 第二个参数忽略,赋值
NULL
- 第二个参数忽略,赋值
- 销毁(变量使用完之后):
|
|
int pthread_mutex_lock(pthread_mutex_t mutex_p / in/out */);
int pthread_mutex_unlock(pthread_mutex_t mutex_p / in/out */);
int sem_init(
sem_t semaphore_p / out /,
int shared / in /,
unsigned initial_val / in */ // 信号量初始值
);
int sem_destroy(sem_t semaphore_p / in/out */);
int sem_post(sem_t semaphore_p / in/out */);
int sem_wait(sem_t semaphore_p / in/out */);
void barrier1() {
pthread_mutex_lock(&barrier_mutex);
counter ++;
pthread_mutex_unlock(&barrier_mutex);
while (counter < thread_count) sleep(0);
}
void barrier2(int i_round) {
pthread_mutex_lock(&barrier_mutex);
counter ++;
bool to_post = (counter == thread_count (i_round + 1));
pthread_mutex_unlock(&barrier_mutex);
sem_t barrier_sem = &barrier_sem_even;
if (i_round % 2 == 1)
barrier_sem = &barrier_sem_odd;
if (to_post) {
for (int i = 0; i + 1 < thread_count; ++ i)
sem_post(barrier_sem);
} else {
sem_wait(barrier_sem);
}
}
void barrier3(int i_round) {
pthread_mutex_lock(&barrier_mutex);
counter ++;
if (counter == thread_count * (i_round + 1)) {
pthread_cond_broadcast(&barrier_cond);
} else {
while (pthread_cond_wait(&barrier_cond, &barrier_mutex) != 0) sleep(0);
}
pthread_mutex_unlock(&barrier_mutex);
}
// 解锁一个阻塞的线程
int pthread_cond_signal(pthread_cond_t cond_var_p / in/out */);
// 解锁所有被阻塞的线程
int pthread_cond_broadcast(pthread_cond_t cond_var_p / in/out */);
// 通过互斥量mutex_p阻塞线程,直到其他线程解锁它
int pthread_cond_wait(
pthread_cond_t cond_var_p / in/out /,
pthread_mutex_t mutex_p / in/out /
);
// 初始化:第二个常数通常传递NULL
int pthread_cond_init(
pthread_cond_t cond_p / out /,
const pthread_condattr_t cond_attr_p / in /
);
// 销毁
int pthread_cond_destroy(pthread_cond_t cond_p / out */);
pthread_mutex_unlock(&mutex_p);
wait_on_signal(&cond_var_p);
pthread_mutex_lock(&mutex_p);
pthread_barrier_t b;
pthread_barrier_init(&b, NULL, thread_count);
void barrier4(int i_round) {
pthread_barrier_wait(&b);
}
pthread_barrier_destroy(&b);
// 不使用第二个参数,传递NULL值
int pthread_rwlock_init(
pthread_rwlock_t rwlock_p / out /,
const pthread_rwlockattr_t attr_p / in /
);
int pthread_rwlock_destroy(pthread_rwlock_t rwlock_p / in/out */);
// 加读锁
int pthread_rwlock_rdlock(pthread_rwlock_t rwlock_p / in/out /);
// 加写锁
int pthread_rwlock_wrlock(pthread_rwlock_t rwlock_p / in/out /);
// 解锁
int pthread_rwlock_unlock(pthread_rwlock_t rwlock_p / in/out */);
// ……
#include
int main(int argc, char **argv) {
// …
MPI_Init(&argc, &argv);
//…
MPI_Finalize();
return 0;
}
int commSize, commRank;
MPI_Comm_rank(MPI_COMM_WORLD, &commRank);
MPI_Comm_size(MPI_COMM_WORLD, &commSize);
MPI_Send(
void msg_buf_p / in /,
int msg_size / in /,
MPI_Datatype msg_type / in /,
int dest / in /,
int tag / in /,
MPI_Comm communicator / in */
);
MPI_Recv(
void msg_buf_p / in /,
int buf_size / in /,
MPI_Datatype buf_type / in /,
int source / in /,
int tag / in /,
MPI_Comm communicator / in /,
MPI_Status status_p / out /
);
int MPI_Get_count(
MPI_Status status_p / in /,
MPI_Datatype type / in /,
int count_p / out /
);
int MPI_Reduce(
void input_data_p / in /,
void output_data_p / out /,
int count / in /,
MPI_Datatype datatype / in /,
MPI_Op operator / in /,
int dest_process / in /,
MPI_Comm comm / in /
);
MPI_Reduce(&local_int, &local_int, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
double local_x[N], sum[N];
// do something
MPI_Reduce(local_x, sum, N, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&x, &x, 1, MPI_DOUBLE, MPI_SUM, 0, comm);
int MPI_Allreduce(
void input_data_p / in /,
void output_data_p / out /,
int count / in /,
MPI_Datatype datatype / in /,
MPI_Op operator / in /,
MPI_Comm comm / in /
);
int MPI_Bcast(
void data_p / in/out /,
int count / in /,
MPI_Datatype datatype / in /,
int source_proc / in /,
MPI_Comm comm / in */
);
int MPI_Scatter(
void send_buf_p / in /,
int send_count / in /,
MPI_Datatype send_type / in /,
void recv_buf_p / out /,
int recv_count / in /,
MPI_Datatype recv_type / in /,
int src_proc / in /,
MPI_Comm comm / in /
);
int MPI_Gather(
void send_buf_p / in /,
int send_count / in /,
MPI_Datatype send_type / in /,
void recv_buf_p / out /,
int recv_count / in /,
MPI_Datatype recv_type / in /,
int dest_proc / in /,
MPI_Comm comm / in /
);
int MPI_Allgather(
void send_buf_p / in /,
int send_count / in /,
MPI_Datatype send_type / in /,
void recv_buf_p / out /,
int recv_count / in /,
MPI_Datatype recv_type / in /,
MPI_Comm comm / in /
);
int MPI_Type_create_struct(
int count / in /,
int array_of_blocklengths[] / in /,
MPI_Aint array_of_displacements[] / in /,
MPI_Datatype array_of_types[] / in /,
MPI_Datatype new_type_p / out */
);
itn MPI_Get_address(
void location_p / in /,
MPI_Aint address_p / out /
);
MPI_Aint a_addr, b_addr, n_addr;
MPI_Get_address(&a, &a_addr);
array_of_displacements[0] = 0;
MPI_Get_address(&b, &b_addr);
array_of_displacements[1] = b_addr - a_addr;
MPI_Get_address(&n, &n_addr);
array_of_displacements[2] = n_addr - a_addr;
MPI_Datatype array_of_types[3] = {MPI_DOUBLE, MPI_DOUBLE, MPI_INT};
MPI_Datatype input_mpi_t;
MPI_Type_create_struct(3, array_of_blocklengths, array_of_displacements, array_of_types, &input_mpi_t);
int MPI_Type_commit(MPI_Datatype new_mpi_t_p / in/out */);
int MPI_Type_free(MPI_Datatype old_mpi_t_p / in/out */);
double start_time = MPI_Wtime();
//…
double end_time = MPI_Wtime();
#include
struct timeval tv0, tv1;
gettimeofday(&tv0, NULL);
// …
gettimeofday(&tv1, NULL);
double t = tv1.tv_sec - tv0.tv_sec + 1e-6 * (tv1.tv_usec - tv0.tv_usec);
int MPI_Ssend(
void msg_buf_p / in /,
int msg_size / in /,
MPI_Datatype msg_type / in /,
int dest / in /,
int tag / in /,
MPI_Comm communicator / in */
);
int MPI_Sendrecv(
void send_buf_p / in /,
int send_buf_size / in /,
MPI_Datatype send_buf_type / in /,
int dest / in /,
int send_tag / in /,
void recv_buf_p / out /,
int recv_buf_size / in /,
MPI_Datatype recv_buf_type / in /,
int source / in /,
int recv_tag / in /,
MPI_Comm communicator / in /,
MPI_Status status_p / in */
);
int MPI_Sendrecv_replace(
void buf_p / in /,
int buf_size / in /,
MPI_Datatype buf_type / in /,
int dest / in /,
int send_tag / in /,
int source / in /,
int recv_tag / in /,
MPI_Comm communicator / in /,
MPI_Status status_p / in /
);
```