loop_unroll

循环展开(Loop Unrolling)

循环展开的概念

​ GPU 编程偏向于使用确定的代码,GPU没有分支预测能力,所有每一个分支都是要执行的,所以在内核里尽量别写分支,分支包括什么?包括 if 还有 for 之类的循环语句,例如:

1
2
3
for (itn i=0;i<tid;i++) {
// to do something
}

​ 如果上面这段代码出现在内核中,就会有分支,因为一个线程束第一个线程和最后一个线程 tid 相差32(如果线程束大小是32的话) 那么每个线程执行的时候,for 终止时完成的计算量都不同,这就有线程要等待,这也就产生了分支

循环展开是一个尝试通过减少分支出现的频率和循环维护指令来优化循环的技术
上面这句属于书上的官方说法,我们来看看例子,不止并行算法可以展开,传统串行代码展开后效率也能一定程度的提高,因为省去了判断和分支预测失败所带来的迟滞

循环控制部分的开销可以分为 3 类:

  • 一次性的开销,例如 i=0 初始化;
  • 每次循环的控制开销,例如 i<n 判断;
  • 每次循环后的操作,例如 i++ 递增。

因此,在循环体之外,这些循环控制的开销很多时候不能无视,因为它们也将在 SM 上执行,占用计算资源。能否将这些额外的控制占用的成本降低到最小,则是取得性能提升的一个关键因素

手动循环展开

​ 以内核中的 C++ 循环展开为例:

1
2
3
for (int i=0;i<100;i++) {
a[i]=b[i]+c[i];
}

​ 这个是最传统的写法,我们下面进行循环展开:

1
2
3
4
5
6
for (int i=0;i<100;i+=4) {
a[i+0]=b[i+0]+c[i+0];
a[i+1]=b[i+1]+c[i+1];
a[i+2]=b[i+2]+c[i+2];
a[i+3]=b[i+3]+c[i+3];
}

​ 很简单,代码的改动就是修改循环体的内容,把本来循环自己搞定的东西,我们自己列出来了,这样做的好处,从串行较多来看是减少了条件判断的次数。但是如果把这段代码拿到电脑上跑,其实看不出来啥效果,因为现代编译器会自动帮我们做循环展开优化
​ 不过值得注意的是:目前CUDA的编译器还不能帮我们做这种优化,人为的展开核函数内的循环,能够非常大的提升内核性能

展开的归约

​ 在前面的归约问题中,核函数中每个线程块只处理对应那部分的数据,我们现在的想法是能不能用一个线程块处理多块数据,其实这是可以实现的,如果在对这块数据进行求和前(因为总是一个线程对应一个数据)使用每个线程进行一次加法,从别的块取数据,相当于先做一个向量加法,然后再归约,这样将会用一句指令,完成之前一半的计算量,这个性价比看起来太诱人了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
__global__ void reduceUnroll2(int * g_idata, int * g_odata, unsigned int n) {
//set thread ID
unsigned int tid = threadIdx.x;
unsigned int idx = blockDim.x * blockIdx.x * 2 + threadIdx.x;
//boundary check
if (tid >= n) return;
//input data pointer
int *idata = g_idata + blockIdx.x * blockDim.x * 2;
if(idx + blockDim.x<n) {
g_idata[idx] += g_idata[idx+blockDim.x];
}
__syncthreads();
//in-place reduction in global memory
for (int stride = blockDim.x/2; stride>0 ; stride >>=1) {
if (tid < stride) {
idata[tid] += idata[tid + stride];
}
//synchronize within block
__syncthreads();
}
//write result for this block to global mem
if (tid == 0)
g_odata[blockIdx.x] = idata[0];

}
  • unsigned int idx = blockDim.x * blockIdx.x * 2 + threadIdx.x:前线程在全局数据中的索引。由于每个块处理的数据是 2 * blockDim.x,所以乘以2(由于是一维块 block size = blockDim.x)
  • idx + blockDim.x<n:如果下一个块对应的地址还有数据的话,就进行向量加法(idx这个值是不可能计算指向靠后的一个 block 的)
  • for (int stride = blockDim.x/2; stride>0 ; stride >>=1):使用交叉归约的方式

img

​ 上图就是我们循环展开归约的代码

完全展开的归约

​ 接着我们的目标是最后那 32 个线程,因为归约运算是个倒金字塔,最后的结果是一个数,所以每个线程最后 64 个计算得到一个数字结果的过程,每执行一步,线程的利用率就降低一倍,因为从 64 到 32,然后 16。。这样到 1 的,我们现在像个办法,展开最后的6步迭代(64,32,16,8,4,2,1)使用下面的核函数来展开最后6步分支计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
__global__ void reduceUnrollWarp8(int * g_idata,int * g_odata,unsigned int n) {
//set thread ID
unsigned int tid = threadIdx.x;
unsigned int idx = blockDim.x*blockIdx.x*8+threadIdx.x;
//boundary check
if (tid >= n) return;
//convert global data pointer to the
int *idata = g_idata + blockIdx.x*blockDim.x*8;
//unrolling 8;
if(idx+7 * blockDim.x<n) {
int a1=g_idata[idx];
int a2=g_idata[idx+blockDim.x];
int a3=g_idata[idx+2*blockDim.x];
int a4=g_idata[idx+3*blockDim.x];
int a5=g_idata[idx+4*blockDim.x];
int a6=g_idata[idx+5*blockDim.x];
int a7=g_idata[idx+6*blockDim.x];
int a8=g_idata[idx+7*blockDim.x];
g_idata[idx]=a1+a2+a3+a4+a5+a6+a7+a8;

}
__syncthreads();
//in-place reduction in global memory
for (int stride = blockDim.x/2; stride>32; stride >>=1) {
if (tid <stride) {
idata[tid] += idata[tid + stride];
}
//synchronize within block
__syncthreads();
}
//write result for this block to global mem
if(tid<32) {
volatile int *vmem = idata;
vmem[tid]+=vmem[tid+32];
vmem[tid]+=vmem[tid+16];
vmem[tid]+=vmem[tid+8];
vmem[tid]+=vmem[tid+4];
vmem[tid]+=vmem[tid+2];
vmem[tid]+=vmem[tid+1];

}

if (tid == 0)
g_odata[blockIdx.x] = idata[0];

}
  • 上面这段代码将循环展开的归约进一步展开了,刚才是一个线程负责两个 block_size 大小的数据,现在一个线程负责 8 个 block_size 大小的数据。然后进行交叉归约操作,到最后一步又使用了一次归约的操作(在线程数小于 32 的情况下,直接把所有的数据直接加起来)

warp-level reduction(最重要的一步)

1
2
3
4
5
6
7
8
9
if (tid < 32) {
volatile int *vmem = idata;
vmem[tid] += vmem[tid + 32];
vmem[tid] += vmem[tid + 16];
vmem[tid] += vmem[tid + 8];
vmem[tid] += vmem[tid + 4];
vmem[tid] += vmem[tid + 2];
vmem[tid] += vmem[tid + 1];
}
  • 对于 tid < 32 的线程,使用 volatile 关键字确保编译器不会优化掉这些内存访问
  • 通过多次迭代,逐步将32个线程内的数据归约为一个值
  • 当只剩下最后下面三角部分,从 64 个数合并到一个数,首先将前 32 个数,按照步长为 32,进行并行加法,前 32 个 tid 得到 64 个数字的两两和,存在前 32 个数字中
  • 然后这 32 个数加上步长为 16 的变量,理论上,这样能得到 16 个数,这 16 个数的和就是最后这个块的归约结果

这一步可能产生疑惑的另一个原因是既然是同步执行,会不会比如线程 17 加上了线程 33 后写入 17 号的内存了,这时候 1 号才来加 17 号的结果,这样结果就不对了,因为我们的 CUDA 内核从内存中读数据到寄存器,然后进行加法都是同步进行的,也就是 17 号线程和 1 号线程同时读 33 号和 17 号的内存,这样 17 号即便在下一步修改,也不影响1号线程寄存器里面的值了

  • 这样继续计算,得到最后的一个有效的结果就是 tid[0]

volatile int 类型变量是控制变量结果写回到内存,而不是存在共享内存,或者缓存中,因为下一步的计算马上要用到它,如果写入缓存,可能造成下一步的读取会读到错误的数据

真正完全展开的归约

​ 根据上面展开最后64个数据,我们可以直接就展开最后128个,256个,512个,1024个,下面代码可以将 tid<512 的归约计算完全展开:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
__global__ void reduceCompleteUnrollWarp8(int * g_idata,int * g_odata,unsigned int n)
{
//set thread ID
unsigned int tid = threadIdx.x;
unsigned int idx = blockDim.x*blockIdx.x*8+threadIdx.x;
//boundary check
if (tid >= n) return;
//convert global data pointer to the
int *idata = g_idata + blockIdx.x*blockDim.x*8;
if(idx+7 * blockDim.x<n) {
int a1=g_idata[idx];
int a2=g_idata[idx+blockDim.x];
int a3=g_idata[idx+2*blockDim.x];
int a4=g_idata[idx+3*blockDim.x];
int a5=g_idata[idx+4*blockDim.x];
int a6=g_idata[idx+5*blockDim.x];
int a7=g_idata[idx+6*blockDim.x];
int a8=g_idata[idx+7*blockDim.x];
g_idata[idx]=a1+a2+a3+a4+a5+a6+a7+a8;

}
__syncthreads();
//in-place reduction in global memory
if(blockDim.x>=1024 && tid <512)
idata[tid]+=idata[tid+512];
__syncthreads();
if(blockDim.x>=512 && tid <256)
idata[tid]+=idata[tid+256];
__syncthreads();
if(blockDim.x>=256 && tid <128)
idata[tid]+=idata[tid+128];
__syncthreads();
if(blockDim.x>=128 && tid <64)
idata[tid]+=idata[tid+64];
__syncthreads();
//write result for this block to global mem
if(tid<32) {
volatile int *vmem = idata;
vmem[tid]+=vmem[tid+32];
vmem[tid]+=vmem[tid+16];
vmem[tid]+=vmem[tid+8];
vmem[tid]+=vmem[tid+4];
vmem[tid]+=vmem[tid+2];
vmem[tid]+=vmem[tid+1];

}

if (tid == 0)
g_odata[blockIdx.x] = idata[0];

}

#pargma unroll 指令

针对已知循环次数的小循环语句,编译器通常会默认将循环展开。此外,还可以通过 #pragma unroll 指令控制任何给定的循环语句在编译时展开。该指令必须放在循环语句之前并且紧接着循环语句,仅对该循环适用

​ 当一个循环的次数非常大时,往往不可能完全展开,例如有 1000 次循环,展开后代码量可能超过设备上的代码缓存,所以 #pragma unroll 指令后面允许跟随一个整数常量表达式 ICE,此参数是可选的。例如,在指定 ICE 为 20 的情况下,总循环次数 1000 次,每 20 次展开,这样从原本的 1000 次循环,每次完成 1 个任务,变成了循环 50 次 (1000/20=50),每次完成 20 个任务,虽然不能完全展开,但控制成本变成了原本的1/20,同时编译器生成的目标代码体积,也只增加了 20 倍,是一种不错的折中方法

​ 假设 ICE 缺省,如果循环次数是个恒定值,则循环将完全展开;如果 ICE 计算结果为 1,编译器将不会展开循环;如果 ICE 计算结果为非正整数或大于 int 数据类型可表示的最大值的整数,则该 #pragma unroll 指令将被忽略。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct S1_t { static const int value = 4; };
template <int X, typename T2>
__device__ void foo(int *p1, int *p2) {

// no argument specified, loop will be completely unrolled
#pragma unroll
for (int i = 0; i < 12; ++i)
p1[i] += p2[i]*2;

// unroll value = 8
#pragma unroll (X+1)
for (int i = 0; i < 12; ++i)
p1[i] += p2[i]*4;

// unroll value = 1, loop unrolling disabled
#pragma unroll 1
for (int i = 0; i < 12; ++i)
p1[i] += p2[i]*8;

// unroll value = 4
#pragma unroll (T2::value)
for (int i = 0; i < 12; ++i)
p1[i] += p2[i]*16;
}


__global__ void bar(int *p1, int *p2) {
foo<7, S1_t>(p1, p2);
}