高性能计算

  • MPI 跨节点并行,并行度高
    • 分布式系统
  • pThread 接近底层,灵活

    • 提供虚拟地编写任何可知线程行为的能力
    • 共享内存
  • openmp 隐藏底层细节

    • 允许编译器和运行时系统决定线程行为的一些细节
    • 使用openmp编写一些并行行为更容易
    • 很难对一些底层的线程交互进行编程
    • 共享内存

openmp

  • 特征:
    • 任务并行化
    • 显式线程同步
  • 编译选项(打开编译器支持) : -fopenmp

  • 头文件 : #include <omp.h>

  • 最基本的parallel指令 : #pragma omp parallel

    • 表明之后的结构化代码块(structured block, 基本块)应被多个线程并行执行
    • 一个结构化代码块是一条C语句或者只有一个入口和一个出口的一组复合C语句,代码中允许调用exit函数
    • (简单地禁止分支语句或离开结构化代码块)
  • 线程(thread) : 执行线程(thread of execution)

    • 共享派生(fork)它们的进行的大部分资源(如标准输入输出)
    • 每个线程有自己的栈和计数器
    • 完成执行后合并(join)到启动它的进程
  • 指定线程数 : #pragma omp parallel num_threads(thread_count)

    • num_threads子句,其中thread_count为并发执行的线程数
    • note : 程序可以启动的线程数可能受系统定义的限制,openmp不保证实际能够启动thread_count个线程(大部分系统能够启动数百、数千个线程)

程序开始执行时,进程启动
程序到达parallel指令时,原线程继续执行,启动另外thread_count - 1个线程

  • 原线程为主线程(master),额外的线程为从线程(slave)
  • 执行并行块的线程集合(主线程 和 从线程)为线程组

  • 隐式路障 : 完成代码块的线程将等待线程组中的所有其他线程完成代码块

    • 之后从线程终止,主线程继续执行之后的(非并行)代码
  • 获取线程编号 : int omp_get_thread_num(void);

    • [0, 1, …, thread_count-1]
  • 获取线程组中的线程数 : int omp_get_num_threads(void);

  • 错误检查 : 检查编译器对OpenMP的支持

    • 不支持OpenMP的编译器只忽略 parallel 指令
1
2
3
#ifdef _OPENMP
#include <omp.h>
#endif
  • 临界区 : #pragma omp critical
    • 临界区 : 一个被多个更新共享资源的线程执行的代码(块),一次只能被一个线程更新
    • 竞争条件(race condition) : 多个线程试图访问一个共享资源,并且至少其中一个访问是更新该共享资源(可能会导致错误)
    • 将引起竞争条件的代码声明为临界区
  • 一个能够被线程组中的所有线程访问的变量拥有共享作用域,一个只能被单个线程访问的变量拥有私有作用域

    • 在parallel块中声明的变量在线程的(私有)栈中分配,都是私有变量(拥有私有作用域)
    • 在parallel块之前声明的变量的缺省作用域是共享的
  • 归约子句 : reduction(<operator> : <variable list>)

    • 如 : #pragma omp parallel reduction(+ : global_result)
    • operator : +, *, &, |, ^, &&, ||之一
    • 归约操作符,满足交换律和结合律的二元运算符(减法不满足,需将减法转成加法)
    • variable list : 归约变量

归约就是将相同的归约操作符重复地应用到操作数序列得到一个结果的运算
所有操作的中间结果存储在同一个变量(归约变量, reduction variable)中

note : 浮点型数据的运算不满足结合律(截断误差的影响)

  • 循环并行化 : #pragma omp parallel for
    • 结构化代码块必须是for循环
    • 系统通过在线程间划分循环迭代并行化for循环
    • 线程间的缺省划分方式是由系统决定的,大部分粗略地使用块划分
    • 和parallel指令的不同 :
    • (1)parallel的块的具体工作一般须线程本身在线程之间划分
    • (2)parallel指令之前声明的变量缺省作用域是共享的,parallel for指令循环变量的缺省作用域是私有的

warning :
OpenMP只会并行化for循环,不会并行化while或do-while循环(需转成等效的for循环)
OpenMP只能并行化在以下情况下确定迭代次数的for循环
(1)由for语句本身(for (...;...;...))确定
(2)在循环之前确定
— 无限循环不能并行化
— 有break语句的循环不能并行化(break添加了另一个从循环退出的出口)
OpenMP只能并行化具有典型结构的for循环
(1)变量index必须是整型或指针类型(非float型浮点数)
(2)表达式start、end、incr必须有一个兼容的类型
(3)表达式start、end、incr不能在循环执行期间改变
(4)在循环期间变量index只能够被for语句的增量表达式修改

note : 循环体中可以有一个exit调用

  • 数据依赖性 : 循环中,迭代中的计算依赖于一个或多个先前的迭代结果
    • OpenMP编译器不检查被parallel for指令并行化的循环所包含的迭代间的依赖关系(由程序员识别)
    • 一个或多个迭代结果依赖于其它迭代的循环,一般不能被OpenMP正确地并行化

循环依赖(loop-carried dependence)
注意缺省情况下任何在循环前声明的变量(除了例外的循环变量)在线程间都是共享的

不可并行化的数据依赖:Read-after-Write、Write-after-Write

  • private子句 : private(<variable list>)
    • 在private子句内列举的变量在每个线程上都有一个私有副本被创建

note : 一个私有作用域的变量的值在parallel块或者parallel for块开始处是未指定的,在parallel块或parallel for块完成之后也是未指定的
例如一个在块之前声明并初始化的变量,在private子句声明为私有作用域的块中一开始的值是非确定的

  • shared子句 : shared(<variable list>)
    • 明确变量的作用域为共享的
  • default(none) : 显式要求变量的作用域显式指定

    • 编译器将要求程序员明确在这个块中使用的每个变量和已经在块之外声明的变量的作用域

对于并行化for循环而言,parallel for指令不是一个通用的解决方案(数据依赖/循环依赖)

  • 循环调度 : 各次循环分配给线程的操作(操作系统完成)
    • 大部分OpenMP实现粗略地使用块分割 : 如果在串行循环中有n次迭代,那么在并行循环中,前n/thread_count个迭代分配给线程0,接下来的n/thread_count个迭代分配给线程1,以此类推
    • 不是最优的分配方式 : 如果循环首尾的运算量不同,会造成负载不均衡
    • 适合循环划分(轮流分配线程的工作):各次迭代被轮流地一次一个地分配给线程
  • schedule子句 : schedule(<type>[, <chunksize>])

    • type :
    • (1)static : 迭代在循环执行前分配给线程(循环划分)
    • (2)dynamic/guided : 迭代在循环执行时分配给线程,在一个线程完成了当前迭代集合后能从运行时系统中请求更多
    • (3)auto : 编译器和运行时系统决定调度方式
    • (4)runtime : 调度在运行时决定
    • chunkszie : 迭代块大小,正整数(迭代块是在顺序循环中连续执行的一块迭代语句)
  • static调度类型 : 系统以轮转的方式分配chunksize块个迭代给每个线程

    • 例如,12个迭代,三个线程
    • schedule(static, 1)
      • Thread 0: 0, 3, 6, 9
      • Thread 1: 1, 4, 7, 10
      • Thread 2: 2, 5, 8, 11
    • schedule(static, 2)
      • Thread 0: 0, 1, 6, 7
      • Thread 1: 2, 3, 8, 9
      • Thread 2: 4, 5, 10, 11

schedule(static, total_iterations/thread_count)相当于大部分OpenMP实现所使用的缺省调度
note : 块大小调整至并行时间最短(均衡点)

  • dynamic调度类型
    • 迭代被分成块大小为chunksize连续的块
    • 每个线程执行一块,当一个线程完成一块时它将从运行时系统请求另一块,直到所有的迭代完成
  • guided调度类型
    • 迭代分成块,块大小近似等于剩下的迭代数除以线程数(当块完成后新块的大小会变小)
    • 若无指定chunksize,则块的大小最小为1;
    • 若指定chunksize,则块的最小大小为chunksize,除了最后一块的大小可以比chunksize小
  • runtim调度类型

    • 调度类型由环境变量OMP_SCHEDULE指定线程数
    • $export OMP_SCHEDULE="static, 1"
    • 可以通过改变环境变量的值而不用更改源代码达到改变调度类型的目的

调度选择 : 平衡调度开销和负载均衡
(1)如果循环的每次迭代需要几乎相同的计算量,可能默认的调度方式能提供最好的性能
(2)如果随着循环的进行,迭代的计算量线性递增/减,采用较小的chunksize的static调度可能提供最好的性能
(3)如果每次迭代的开销不能事先确定,需要尝试多种不同的调度策略:使用schedule(runtime)子句,赋予环境变量OMP_SCHEDULE不同的值,比较不同调度策略下程序的性能

生产者和消费者问题

不适合用parallel for指令或者for指令并行化的问题

  • 使用的数据结构 : 队列
    • 生产者线程产生对服务器数据的请求,将请求入队
    • 消费者线程发现和生成数据消费请求,将请求出队

应用 : 共享内存系统上实现消息传递
每一个线程有一个共享消息队列,当一个线程要向另一个线程“发送消息”时,将消息放入目标线程的消息队列
一个线程接收消息只需从它的消息队列的头部取出消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
for (sent_msgs = 0; sent_msgs < send_max; ++sent_msgs) {
Send_msg();
Try_receive();
}
while (!Done())
Try_receive();
// 发送消息
// 访问消息队列,将消息入队是一个临界区
// 两个线程同时试图检查和更新队尾指针可能会丢失一条由其中一个线程入队的消息
Send_msg() :
mesg = generate_mesg(); // 产生消息
dest = determine_dest_thread(); // 目的进程
#pragma omp critical
Enqueue(queue, dest, my_rank, mesg); // 消息入队
// 接收消息
// 只用一个变量存储队列大小,对该变量的操作就形成临界区 : 两个变量
// 只有消息队列的拥有者能出队
Try_receive() :
queue_size = enqueued - dequeued;
if (queue_size == 0)
return ;
else if (queue_size == 1)
#pragma omp critical
Dequeue(queue, &src, &mesg);
else
Dequeue(queue, &src, &mesg);
Print_message(src, mesg);
// 终止检测
// 每个线程在for循环结束后将done_sending加1
Done() :
queue_size = enqueued - dequeued;
if (queue_size == 0 && done_sending == thread_count)
return true;
esle
return false;
  • 显式路障 : #pragma omp barrier
    • 线程遇到路障时被阻塞,直到组中所有线程都到达了这个路障,才能继续往下执行
  • 原子操作 : #pragma omp atomic

    • 保护一条C语言赋值语句所形成的临界区
    • 修改相同变量的不同语句被视为同一临界区
    • x <op>= <expression>;
    • x++;
    • x--;
    • ++x;
    • --x;
    • : +, *, -, /, &, ^, |, <<, >>

note : 不能引用x
只有x的装载和存储确保受保护,对于中含有的不受保护的变量的更新,会使程序结果不可预测
x += y++;

  • 命名临界区 : #pragma omp critical(name)
    • OpenMP默认的做法是将所有的临界区代码块作为复合临界区的一部分,可能非常不利于程序的性能
    • 强制线程间的互斥会使程序的执行串行化
    • 两个用不同名字而定critical指令保护的代码块就可以同时执行
  • 锁(lock) : 一个数据结构和定义在其上的函数组成

    • 可以显式地强制对临界区进行互斥访问
    • 一个线程进入临界区前尝试调用锁函数上锁(set)
    • 如果没有其他的线程正在执行临界区的代码,则此线程获得锁并进入临界区
    • 该线程执行完临界区的代码后调用解锁函数释放(unset/relinquish)锁,以便其他线程可以获得锁
    • 当一个线程拥有锁时其他线程都不能进入该临界区
1
2
3
4
5
6
7
8
9
10
11
12
13
/* executed by one thread */
initialize the lock data structure;
/* executed by multiple threads */
attempt to lock or set the lock data structure;
critical section;
unlock or unset the lock data structure;
/* executed by one thread */
destroy the lock data structure;
  • OpenMP有两种锁 : 简单(simple)锁 和 嵌套(nested)锁
    • 简单锁在被释放前只能获得一次
    • 嵌套锁在被释放前可以被同一个线程获得多次
1
2
3
4
5
// OpenMP simple lock
void omp_init_lock(omp_lock_t* lock_p /* out */);
void omp_set_lock(omp_lock_t* lock_p /* in/out */);
void omp_unset_lock(omp_lock_t* lock_p /* in/out */);
void omp_destroy_lock(omp_lock_t* lock_p /* in/out */);

atomic指令是实现互斥访问最快的访问(处理器专门的装载-修改-存储指令)
OpenMP规范允许atomic指令对程序中所有的atomic指令标记的临界区进行强制互斥访问(类似未命名的critical指令),可能导致不必要的互斥(有些实现把两条修改不同变量的被atomic指令保护的语句视为相同的临界区)
此时可以选择命名的critical指令或者锁

  • 互斥可能会引起一些严重的编程问题
    • 对同一临界区混用不同的机制不能保证互斥执行
    • 互斥不一定保证公平性,某线程可能一直阻塞等待对某个临界区的执行(饥饿现象)
    • 嵌套互斥结构或线程以不同的顺序进入多个临界区可能会产生死锁

缓存、缓存一致性、伪共享

  • 缓存(cache):解决主存速度比处理器慢而拖慢计算速度的问题
    • 原理:时间和空间局部性
    • 结构:缓存行和缓存块

现代的微处理器架构使用缓存以减少主存访问时间

  • 典型的体系结构都有专门的硬件确保在不同的处理器芯片上的缓存是一致的

  • 写缺失(write-miss):核试图修改不在缓存中的变量时发生

    • 内核必须访问主存
  • 读缺失(read-miss):核试图读取不在缓存中的变量时发生

    • 内核必须访问主存
  • 伪共享:两个线程可能访问内存中的不同位置,但是当这两个位置属于同一个缓存行时缓存一致性硬件所表现出来的处理方式就好像这两个线程访问的是内存中的同一个位置

    • 如果其中一个线程更新了它所访问的主存地址的值,则另外一个变量试图读取它要访问的主存地址时不得不从主存获取该值
    • 硬件强制该线程表现得好像它共享了变量(实际上没有共享任何变量)
    • 会大大降低共享内存程序的性能

两种可行的避免伪共享的解决方案
1.用伪变量填充以确保任意一个线程的更新不会影响其他线程的缓存行
2.每个线程在迭代期间使用私有存储,在计算完成后更新共享存储

线程安全性

  • 一个代码块被多个线程同时执行时不会产生错误,则是线程安全的

  • 某些C函数通过声明static存储类型的变量存储输入行,会导致存储在此变量中的值从上一个调用保留到下一个调用;这个变量是共享的,一个线程可以写覆盖另外一个线程的数据

    • 这些函数不是线程安全的

pthread

共享内存编程:
不同的处理器尝试更新共享内存区域上同一位置的数据会导致共享区域的内容无法预测。
更新共享区域内存的代码段:临界区(critical section)

  • 进程
    • 可执行代码
    • 栈段
    • 堆段
    • 资源描述符(操作系统分配,如文件描述符)
    • 安全信息(如进程被允许访问的软硬件资源)
    • 进程状态信息(运行/等待资源、寄存器/程序计数器值)
  • 大多数操作系统默认状态下一个进程的内存块是私有的

    • 其他进程无法绕过OS直接访问
  • 线程:轻量级进程

  • POSIX线程库:Pthreads

  • 链接Pthreads线程库: -lpthread

    • 某些系统无需添加链接选项编译器会自动链接
    • 如: $gcc src.c -o exec.out -lpthread
  • 源代码包含Pthreads线程库头文件: pthread.h

全局变量为所有线程所共享

启动线程

  • pthread_t 数据结构存储线程的专有信息
    • pthread_t对象是不透明对象,其中的数据由OS绑定,用户级代码无法直接访问
    • pthread_t对象是线程的唯一标识
  • pthread_create 函数生成线程

1
2
3
4
5
6
int pthread_create(
pthread_t* thread_p /*out*/,
const pthread_attr_t* attr_p /*in*/,
void* (*start_routine)(void*) /*in*/,
void* arg_p /*in*/
);
    • 第一个参数为pthread_t指针,需在调用前就分配好内存空间
    • 第二个参数忽略,NULL
    • 第三个参数为线程函数
    • 第四个参数为传递给线程函数的参数
    • 返回值用于表示线程调用过程是否正确
  • 线程函数 void* thread_function(void* args_p);

    • 常用参数传递方式,包含在struct thread_data_t
    • 线程函数通过thread_data_t *my_data = (thread_data_t*)args_p;获取参数
  • 参数转换成void*型指针再传进去

“单程序,多数据”并行模式
不同线程可以指定不同线程函数运行

运行线程

  • 主线程运行main函数

Pthreads不允许程序员控制线程在哪个核上运行(允许指定核心的实现是不可移植的)
线程调度有OS指定

停止线程

  • pthread_join函数等待pthread_t对象关联的线程结束
1
2
3
4
int pthread_join(
pthread_t thread /*in*/;
void** ret_val_p /*out*/
);
    • 第二个参数接受任意由pthread_t对象所关联的线程产生的返回值
    • 通常的接受返回值方法,包含在struct thread_result
    • 线程函数返回该结构类型的对象的指针

临界区

  • 竞争条件(race condition):
    • 多个线程都要访问共享变量或共享文件等共享资源
    • 至少一个访问是更新操作
    • 这些访问就可能会导致某种错误

忙等待

  • 使用共享的标志变量阻塞防止多个线程进入临界区
1
2
3
4
5
循环(无穷) {
判断标志变量:
标志忙: continue;
标志闲: 进入临界区(如写共享变量); break;
}
  • 在忙等待中线程不停地测试某个条件
  • 在某个条件满足之前测试都是徒劳的

忙等待有效的前提是严格按照书写顺写执行代码
有些编译器优化会破坏这个条件

  • 忙等待不是控制临界区最好的方法
    • 浪费CPU周期不停进行循环条件测试
    • 对性能可能有极大影响
    • 关闭编译器优化会降低性能

互斥量

  • mutex : mutually exclusive

处于忙等待状态的线程持续使用CPU造成资源浪费

  • 互斥量 : 互斥锁
    • 特殊类型的变量
    • 通过特殊类型的函数
    • 用来限制每次只有一个线程能进入临界区
  • Pthreads标准为互斥量提供的类型:pthread_mutex_t

  • 以及一些相关操作:

    • 初始化(变量使用前):
1
2
3
4
int pthread_mutex_init(
pthread_mutex_t* mutex_p /*out*/,
const pthread_mutexattr_t* attr_p /*in*/
);
    • 第二个参数忽略,赋值NULL
    • 销毁(变量使用完之后):
1
2
3
4
int pthread_mutex_destroy(pthread_mutex_t* mutex_p /* in/out */);
```
- - 获得临界区的使用权:

int pthread_mutex_lock(pthread_mutex_t mutex_p / in/out */);

1
2
- - 退出临界区:

int pthread_mutex_unlock(pthread_mutex_t mutex_p / in/out */);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
> 调用 ``pthread_mutex_lock`` 会使线程**阻塞**等待直到没有其他线程在临界区
> 调用 ``pthread_mutex_unlock`` 会通知系统,线程已完成临界区代码的执行
> Pthreads无法保证线程按调用 ``pthread_mutex_lock`` 的顺序获得进入临界区的锁
> 有限个线程尝试获得锁的所有权,理论上每一个线程都会获得锁
> 使用忙等待的多线程程序在线程数超过核心个数时性能会下降
### 信号量
- 信号量 : semaphore
- - 一种特殊类型的unsigned int无符号整型变量
- - ``#include <semaphore.h>``
- - 信号量函数

int sem_init(
sem_t semaphore_p / out /,
int shared /
in /,
unsigned initial_val /
in */ // 信号量初始值
);

int sem_destroy(sem_t semaphore_p / in/out */);

int sem_post(sem_t semaphore_p / in/out */);

int sem_wait(sem_t semaphore_p / in/out */);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 忽略 ``sem_init`` 的第二个参数,只需传入常数0
> 生产者-消费者同步模型:一个线程需要等待另一个线程执行某种操作的同步方式
### 路障和条件变量
- 路障(barrier):同步点
- - 通过保证所有线程在程序中处于同一个位置来同步线程
- - 有线程未抵达同一个路障时其它线程阻塞在路障处
- - 应用:记录“最慢”的线程的时间、调试程序
- Pthreads实现路障的四种方法:
- - 忙等待 + 互斥量
- - 信号量
- - 条件变量(conditional variable)
- - pthread路障对象
- **忙等待+互斥量**实现路障

void barrier1() {
pthread_mutex_lock(&barrier_mutex);
counter ++;
pthread_mutex_unlock(&barrier_mutex);
while (counter < thread_count) sleep(0);
}

1
2
3
4
5
- - 存在问题:线程处于忙等待循环时浪费CPU周期;程序中线程数多于CPU核心数时程序性能会直线下降
- - 有多少个路障就必须有多少个不同的共享``counter``变量来计数
- **信号量+互斥量**实现路障

void barrier2(int i_round) {
pthread_mutex_lock(&barrier_mutex);
counter ++;
bool to_post = (counter == thread_count (i_round + 1));
pthread_mutex_unlock(&barrier_mutex);
sem_t
barrier_sem = &barrier_sem_even;
if (i_round % 2 == 1)
barrier_sem = &barrier_sem_odd;
if (to_post) {
for (int i = 0; i + 1 < thread_count; ++ i)
sem_post(barrier_sem);
} else {
sem_wait(barrier_sem);
}
}

1
2
3
4
5
- - 使用两个信号量实现多个路障,消除重复使用单个信号量存在的竞争条件
- - 线程被阻塞在``sem_wait``不会消耗CPU周期,性能更佳
- **条件变量+互斥量**实现路障

void barrier3(int i_round) {
pthread_mutex_lock(&barrier_mutex);
counter ++;
if (counter == thread_count * (i_round + 1)) {
pthread_cond_broadcast(&barrier_cond);
} else {
while (pthread_cond_wait(&barrier_cond, &barrier_mutex) != 0) sleep(0);
}
pthread_mutex_unlock(&barrier_mutex);
}

1
2
3
- - **条件变量**是一个数据对象,允许线程在某个特定条件或事件发生前都处于挂起状态;事件/条件发生时另一个线程可以通过*信号*唤醒挂起的线程
- - 一个条件变量总是和一个互斥量相关联

// 解锁一个阻塞的线程
int pthread_cond_signal(pthread_cond_t cond_var_p / in/out */);

// 解锁所有被阻塞的线程
int pthread_cond_broadcast(pthread_cond_t cond_var_p / in/out */);

// 通过互斥量mutex_p阻塞线程,直到其他线程解锁它
int pthread_cond_wait(
pthread_cond_t cond_var_p / in/out /,
pthread_mutex_t
mutex_p / in/out /
);

// 初始化:第二个常数通常传递NULL
int pthread_cond_init(
pthread_cond_t cond_p / out /,
const pthread_condattr_t
cond_attr_p / in /
);
// 销毁
int pthread_cond_destroy(pthread_cond_t cond_p / out */);

1
2
- - pthread_cond_wait相当于调用了:

pthread_mutex_unlock(&mutex_p);
wait_on_signal(&cond_var_p);
pthread_mutex_lock(&mutex_p);

1
2
3
4
- - 若被*pthread_cond_broadcast*和*pthread_cond_signal*以外的事件解除阻塞,则*pthread_cond_wait*的返回值不为0
- **pthreads路障**实现路障

pthread_barrier_t b;
pthread_barrier_init(&b, NULL, thread_count);

void barrier4(int i_round) {
pthread_barrier_wait(&b);
}

pthread_barrier_destroy(&b);

1
2
3
4
### 读写锁
- 初始化读写锁:

// 不使用第二个参数,传递NULL值
int pthread_rwlock_init(
pthread_rwlock_t rwlock_p / out /,
const pthread_rwlockattr_t
attr_p / in /
);

1
2
- 释放读写锁:

int pthread_rwlock_destroy(pthread_rwlock_t rwlock_p / in/out */);

1
2
- 加锁、解锁:

// 加读锁
int pthread_rwlock_rdlock(pthread_rwlock_t rwlock_p / in/out /);
// 加写锁
int pthread_rwlock_wrlock(pthread_rwlock_t
rwlock_p / in/out /);
// 解锁
int pthread_rwlock_unlock(pthread_rwlock_t rwlock_p / in/out */);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
- rdlock 为**读操作**对读写锁加锁,多个线程能通过调用这个函数同时获得锁,但任何请求**写锁**的线程都将阻塞在写锁函数的调用上,
- wrlock 为**写操作**对读写锁加锁,同一时间内只有一个线程能获得写锁,并且当写锁生效时会阻塞其他**读写锁**的获得
- 三种方法确保链表的并发读写的正确性
- 1. 全局互斥量:读和写操作都先锁再执行操作,后解锁
- 2. 每个结点一个互斥量:分开加解锁
- 3. 全局读写锁
> 缓冲、缓冲一致性和伪共享
> 线程安全性
- 不同维度(8000000 x 8; 8000 x 8000; 8 x 8000000)矩阵向量乘的运行时间和效率的差异:
- - 缓存的时间和空间局部性原理
- - 缓存行/缓存块
- - 写缺失(write-miss)、读缺失(read-miss)、伪共享(false sharing)
## MPI
使用消息传递对分布式内存进行编程
MPI:消息传递接口(Message-Passing Interface)
运行在一个核-内存对上的程序为一个**进程**,两个进程间通过发送和接收函数进行通信
### 编译与运行
- 编译:``mpicc``; ``mpic++``; ``mpicxx``
- - C/C++语言的包装脚本(wrapper script)
- - 使用``GUN gcc/g++``编译器
- 运行:
- - ``mpirun -np numOfProcess ./xxx``
- - ``mpiexec -n numOfProcess ./xxx``
### MPI程序
- 头文件:``mpi.h``
- - 包括了MPI函数的原型、宏定义、类型定义等以及编译MPI程序所需要的全部定义与声明
- MPI定义的标识符都由字符串``MPI_``开始
- - 下划线后第一个字母大写表示函数名与MPI定义类型
- - MPI定义的宏和常量的所有字母都是大写的
- ``MPI_Init``:告知MPI系统进行所有必要的初始化设置
- - 语法结构:``int MPI_Init(int* argc_p /* in/out */, char *** arg_p /* in/out */);``
- - 不适用参数时指定为 ``NULL``
- - 为消息缓冲区分配存储空间;为进程指定进程号
- - 程序调用``MPI_Init``之前不应该调用其他MPI函数
- ``MPI_Finalize``告知MPI系统使用完毕,为MPI分配的任何资源都可以释放
- - 语法结构:``MPI_Finalize(void);``
- - 程序调用``MPI_Finalize``后不应该再调用MPI函数
- MPI程序基本框架:

// ……

#include

int main(int argc, char **argv) {
// …
MPI_Init(&argc, &argv);
//…
MPI_Finalize();
return 0;
}

1
2
3
4
5
6
7
8
### 通信子
- 通信子(communicator):一组可以互相发送消息的进程集合
- - MPI_Init在用户启动程序时定义由用户启动的所有进程组成的通信子:``MPI_COMM_WORLD``
- - 获取进程数:``int MPI_Comm_size(MPI_Comm comm /* in */, int* comm_sz_p /* out */);``
- - 获取进程编号:``int MPI_Comm_rank(MPI_Comm comm /* in */, int* my_rank_p /* out */);``
- - ``MPI_Comm``:通信子

int commSize, commRank;
MPI_Comm_rank(MPI_COMM_WORLD, &commRank);
MPI_Comm_size(MPI_COMM_WORLD, &commSize);

1
2
3
4
5
6
7
8
9
- SPMD:单程序多数据流(Single Program, Multiple Data, SPMD)
- - 简单地让进程按照其进程号匹配程序分支
### 通信
- MPI_Send和MPI_Recv称为**点对点通信**(point-to-point communication)
- 点对点消息发送

MPI_Send(
void msg_buf_p / in /,
int msg_size /
in /,
MPI_Datatype msg_type /
in /,
int dest /
in /,
int tag /
in /,
MPI_Comm communicator /
in */
);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
- - 第一个参数指向包含消息内容的内存块的指针
- - 第二个参数指定发送的消息数据量(字符串要长度加1<'\0'占一个字符>)
- - 第三个参数指定发送的数据类型
- - 第四个参数指定接收消息的进程号
- - 第五个参数指定标签以区分不同消息
- - 第六个参数指定通信子(指定通信范围)
| MPI数据类型 | C语言数据类型 |
| :------ | :------ |
| MPI_CHAR | signed char |
| MPI_SHORT | signed short int |
| MPI_INT | signed int |
| MPI_LONG | signed long int |
| MPI_LONG_LONG | signed long long |
| MPI_UNSIGNED_CHAR | unsigned char |
| MPI_UNSIGNED_SHORT | unsigned short int |
| MPI_UNSIGNED | unsigned int |
| MPI_UNSIGNED_LONG | unsigned long int |
| MPI_FLOAT | float |
| MPI_DOUBLE | double |
| MPI_LONG_DOUBLE | long double |
| MPI_BYTE | |
| MPI_PACKED | |
- 点对点消息接收

MPI_Recv(
void msg_buf_p / in /,
int buf_size /
in /,
MPI_Datatype buf_type /
in /,
int source /
in /,
int tag /
in /,
MPI_Comm communicator /
in /,
MPI_Status
status_p / out /
);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
- - 第一个参数指向用于接收消息的内存块的指针
- - 第二个参数指定内存块中可存储对象的数量
- - 第三个参数指定对象类型
- - 第四个参数指定发送消息的进程号
- - 第五个参数指定匹配要接收的消息的tag
- - 第六个参数指定通信子(指定通信范围)
- - 第七个参数可以不使用,赋予``MPI_STATUS_IGNORE``
- 消息匹配
- - q号进程调用``MPI_Send(send_buf_p, send_buf_sz, send_type, dest, send_tag, send_comm);``
- - r号进程调用``MPI_Recv(recv_buf_p, recv_buf_sz, recv_type, src, recv_tag, recv_comm, &status);``
- - 消息能够被接收当且仅当:
- - 1. ``recv_comm = send_comm``
- - 2. ``recv_tag = send_tag``
- - 3. ``dest = r``
- - 4. ``src = q``
- - 5. 兼容的缓冲区:``recv = send_type`` 同时 ``recv_sz >= send_buf_sz``
> MPI_Recv通配符(wildcard):特殊常量
> 接收来自任何源的消息:MPI_ANY_SOURCE
> 接收来自任何标签的消息:MPI_ANY_TAG
> 1.只有接受者可以使用通配符参数(MPI通信机制“推”(push)而非“拉”(pull))
> 2.通信子参数没有通配符
- ``status_p``参数
- - MPI类型``MPI_Status``是一个有至少三个成员的结构:``MPI_SOURCE``, ``MPI_TAG``, ``MPI_ERROR``
- - ``MPI_Status status``作参数传入
- - 消息接收者确定发送者和标签:``status.MPI_SOURCE``; ``status.MPI_TAG``
- - 消息接收者确定接收到的数据量:``MPI_Get_count(&status, recv_type, &count)``

int MPI_Get_count(
MPI_Status status_p / in /,
MPI_Datatype type /
in /,
int
count_p / out /
);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
### MPI_Send和MPI_Recv的细节
- MPI具体实现指定MPI_Send
- - **缓冲**发送消息
- - **阻塞**(block)发送消息
> 典型的实现方法:
> 默认的消息截止大小("cutoff" message size)
> 消息长度小于截止大小则缓冲
> 消息长度大于截止大小则阻塞
- MPI_Recv总是**阻塞**的
> 有指定缓冲/阻塞的替代函数
- MPI消息是不可逾越的(nonovertaking)
- - 同一进程发送的多条消息必须是按发送顺序到达的
> 注意点:
> 试图接收消息的进程接收不到匹配的消息会永久阻塞(进程**悬挂**)
> 缓冲发送消息没有匹配的接受则消息会丢失
### I/O处理
- 几乎所有的MPI实现都允许MPI_COMM_WORLD里的所有进程访问``stdout``和``stderr``
- 但不提供对I/O设备访问的自动调度:多个进程写标准输出的顺序是无法预测的(甚至是可抢占的)
> 0号进程接收其他进程的消息,统一由0号进程输出
- 多部分MPI实现只允许MPI_COMM_WORLD里的0号进程访问``stdin``
> 0号进程负责读取数据,发送给其它进程
### 集合通信
- 树形通信结构能减少需要“归约”的通信过程中某个进程(通信瓶颈的进程)接收的消息数和计算量
- MPI中涉及通信子中所有进程的通信函数称为**集合通信**(collective communication)
- MPI_Reduce : 归约
- - 函数形式:

int MPI_Reduce(
void input_data_p / in /,
void
output_data_p / out /,
int count / in /,
MPI_Datatype datatype / in /,
MPI_Op operator / in /,
int dest_process / in /,
MPI_Comm comm / in /
);

1
2
- - count参数大于0,MPI_Reduce函数可以应用到数组上

MPI_Reduce(&local_int, &local_int, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

double local_x[N], sum[N];
// do something
MPI_Reduce(local_x, sum, N, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
- - MPI中预定义的归约操作符
| 运算符值 | 含义 |
| :------ | :------ |
| MPI_MAX | 最大值 |
| MPI_MIN | 最小值 |
| MPI_SUM | 累加和 |
| MPI_PROD | 累成积 |
| MPI_LAND | 逻辑与 |
| MPI_BAND | 按位与 |
| MPI_LOR | 逻辑或 |
| MPI_BOR | 按位或 |
| MPI_LXOR | 逻辑异或 |
| MPI_BXOR | 按位异或 |
| MPI_MAXLOC | 最大值和最大值所在位置 |
| MPI_MINLOC | 最小值和最小值所在位置 |
> 集合通信相对于点对点通信的特点
> 1. 通信子中所有进程必须调用相同的集合通信函数
> 2. 每个进程传递给MPI集合通信函数的参数必须是相容的,如接收进程号一致
> 3. 参数output_data_p只用在进程dest_process,其他进程只需传递一个可为NULL的相应类型的实际参数
> 4. 集合通信函数不使用标签,只通过通信子和调用顺序匹配消息(符合同一通信子的消息按函数的实际调用顺序接收,和其他因素无关);而点对点通信通过标签和通信子匹配
> 一个非法的调用方式,结果不可预测:MPI禁止输入或输出参数作为其他参数的别名

MPI_Reduce(&x, &x, 1, MPI_DOUBLE, MPI_SUM, 0, comm);

1
2
3
4
5
6
- 蝶形通信结构可以让进程之间互换部分结果
- - 对于全局归约操作,树形通信结构需要6层通信,蝶形通信结构需要3层通信
- MPI_Allreduce : 全局归约
- - 函数形式:

int MPI_Allreduce(
void input_data_p / in /,
void
output_data_p / out /,
int count / in /,
MPI_Datatype datatype / in /,
MPI_Op operator / in /,
MPI_Comm comm / in /
);

1
2
3
4
5
- 相比MPI_Reduce少了dest_process参数:所有进程都能得到结果
- 广播(broadcast)
- - 函数形式:

int MPI_Bcast(
void data_p / in/out /,
int count /
in /,
MPI_Datatype datatype /
in /,
int source_proc /
in /,
MPI_Comm comm /
in */
);

1
2
3
4
5
6
7
8
9
10
- - 属于一个进程的数据被发送到通信子中的所有进程;采用树形通信结构
- 数据分发
- - 划分方式
- - 1. 块划分:按顺序分配一定数目的块到各进程
- - 2. 循环划分:依次分给一个进程一个块,直到分完
- - 3. 块-循环划分:依次分给一个进程多个块,直到分完
- 散射函数:Scatter

int MPI_Scatter(
void send_buf_p / in /,
int send_count /
in /,
MPI_Datatype send_type /
in /,
void
recv_buf_p / out /,
int recv_count / in /,
MPI_Datatype recv_type / in /,
int src_proc / in /,
MPI_Comm comm / in /
);

1
2
3
4
5
6
7
- - 将send_buf_p引用的数据分成通信子comm进程数的分数,依次分发给0, 1, 2...号进程
- - send_count 和 recv_count 都为每份的数据量
- - 块划分方式
- - 其他划分方式:重排发送缓冲数组的数据
- 聚集函数:Gather

int MPI_Gather(
void send_buf_p / in /,
int send_count /
in /,
MPI_Datatype send_type /
in /,
void
recv_buf_p / out /,
int recv_count / in /,
MPI_Datatype recv_type / in /,
int dest_proc / in /,
MPI_Comm comm / in /
);

1
2
3
4
5
- - 将通信子comm中所有进程的数据聚集到dest_proc进程recv_buf_p中
- - recv_count为接收到的一个进程的数据量
- 全局聚集:Allgather

int MPI_Allgather(
void send_buf_p / in /,
int send_count /
in /,
MPI_Datatype send_type /
in /,
void
recv_buf_p / out /,
int recv_count / in /,
MPI_Datatype recv_type / in /,
MPI_Comm comm / in /
);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- - 将通信子comm中每个进程的send_buf_p串联起来分发到每个进程的recv_buf_p中
- - recv_count = send_count
### MPI派生数据类型
> 几乎所有的分布式内存系统中通信比本地开销大很多
> 提高程序性能:减少收发消息数量
- 1. count参数:用于将连续的数组元素集合起来组成一条单独的消息
- 2. 派生数据类型
- 3. MPI_Pack/MPI_Unpack函数
- MPI中**派生数据类型**通过同时存储数据项的类型及其在内存中的相对位置,可以表示内存中数据项的任意集合
- - 一个派生数据类型是由系列MPI基本数据类型和每个数据类型的偏移所组成的
- - 创建

int MPI_Type_create_struct(
int count / in /,
int array_of_blocklengths[] / in /,
MPI_Aint array_of_displacements[] / in /,
MPI_Datatype array_of_types[] / in /,
MPI_Datatype new_type_p / out */
);

1
2
3
4
5
6
- - 参数count指定数据类型中元素个数
- - array_of_blocklengths指定每个元素的数据个数(1个或多个元素可以是数组)
- - array_of_displacements指定每个元素距离消息起始位置的偏移量(单位为字节)
- - 找到内存单元的地址

itn MPI_Get_address(
void location_p / in /,
MPI_Aint
address_p / out /
);

1
2
3
4
5
- - ``MPI_Aint``是足以表示系统地址的整数类型
- - 将``location_p``指向的内存单元的地址保存在``address_p``中
- - 例:

MPI_Aint a_addr, b_addr, n_addr;

MPI_Get_address(&a, &a_addr);
array_of_displacements[0] = 0;
MPI_Get_address(&b, &b_addr);
array_of_displacements[1] = b_addr - a_addr;
MPI_Get_address(&n, &n_addr);
array_of_displacements[2] = n_addr - a_addr;

MPI_Datatype array_of_types[3] = {MPI_DOUBLE, MPI_DOUBLE, MPI_INT};

MPI_Datatype input_mpi_t;
MPI_Type_create_struct(3, array_of_blocklengths, array_of_displacements, array_of_types, &input_mpi_t);

1
2
- - 指定派生数据类型使之可用

int MPI_Type_commit(MPI_Datatype new_mpi_t_p / in/out */);

1
2
3
4
- - 可以像使用MPI基本数据类型一样使用派生数据类型
- - 释放派生数据类型可能所占的额外的存储空间

int MPI_Type_free(MPI_Datatype old_mpi_t_p / in/out */);

1
2
3
4
### MPI程序性能
- 计时函数:``double MPI_Wtime(void);``

double start_time = MPI_Wtime();
//…

double end_time = MPI_Wtime();

1
2
- 串行程序计时:``gettimeofday();``

#include
struct timeval tv0, tv1;
gettimeofday(&tv0, NULL);
// …
gettimeofday(&tv1, NULL);

double t = tv1.tv_sec - tv0.tv_sec + 1e-6 * (tv1.tv_usec - tv0.tv_usec);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
> MPI_Wtime()返回墙上时钟时间(所经历的全部时间,包括空闲等待时间)
> C语言中的clock函数返回的是CPU时间(用户代码、库函数以及系统调用函数所消耗的时间),但不包括空闲时间
> 并行程序中许多情况下都是空闲等待状态
- MPI路障:``int MPI_Barrier(MPI_Comm comm /* in */);``
- - 可在开始计时之前调用
- 加速比:衡量串行运算和并行运算时间之间关系
- - $S(n, p) = \frac{T_{serial(n)}}{T_{parallel(n, p)}}$
- - ``n``:数据规模
- - ``p``:进程数
> 最理想的加速比是``p``:**线性加速比**
- 并行效率:“每个进程”的加速比
- - $E(n, p) = \frac{S(n, p)}{p} = \frac{T_{setial(n)}}{p * T_{parallel(n, p)}}$
- - 线性加速比相当于并行效率为1.0
- - 通常效率小于1
> 并行开销:
> 1. 不同线程/进程间的通信开销
> 2. 分解任务产生的额外开销(计算)
- 可扩展性:如果问题的规模以一定的速率增大,但效率没有随着进程数的增加而降低,那么程序就可认为是可扩展的
- - 强可扩展性:程序可以在不增加问题规模的前提下维持恒定效率(n固定,p增大)
- - 弱可扩展性:问题规模增加需要通过增大进程数维持程序效率的(n固定,p固定)
### MPI程序安全性
- ``MPI_PROC_NULL``:由MPI库定义的一个常量
- - 在点对点通信中作为源进程号或目标进程号,调用后会直接返回而不产生任何通信
> MPI允许MPI_Send函数以两种不同的方式实现,阻塞或异步,或导致死锁或进程挂起
> 可能所有进程都阻塞在MPI_Send上而无法调用MPI_Recv
> 不安全
- MPI提供替代的send通信函数(s代表同步)

int MPI_Ssend(
void msg_buf_p / in /,
int msg_size /
in /,
MPI_Datatype msg_type /
in /,
int dest /
in /,
int tag /
in /,
MPI_Comm communicator /
in */
);

1
2
- MPI提供调度通信的方法:使程序不再挂起或崩溃

int MPI_Sendrecv(
void send_buf_p / in /,
int send_buf_size /
in /,
MPI_Datatype send_buf_type /
in /,
int dest /
in /,
int send_tag /
in /,
void
recv_buf_p / out /,
int recv_buf_size / in /,
MPI_Datatype recv_buf_type / in /,
int source / in /,
int recv_tag / in /,
MPI_Comm communicator / in /,
MPI_Status status_p / in */
);

1
2
3
4
5
- - 分别执行一次阻塞式消息发送和一次消息接收
- - dest,source参数可以相同也可以不同
- - 若发送和接收使用同一个缓冲区

int MPI_Sendrecv_replace(
void buf_p / in /,
int buf_size /
in /,
MPI_Datatype buf_type /
in /,
int dest /
in /,
int send_tag /
in /,
int source /
in /,
int recv_tag /
in /,
MPI_Comm communicator /
in /,
MPI_Status
status_p / in /
);
```