在前一篇文章中,我們介紹了如何使用 GPU 運行的并行算法。這些并行任務(wù)是那些完全相互獨立的任務(wù),這點與我們一般認(rèn)識的編程方式有很大的不同,雖然我們可以從并行中受益,但是這種奇葩的并行運行方式對于我們來說肯定感到非常的復(fù)雜。所以在本篇文章的Numba代碼中,我們將介紹一些允許線程在計算中協(xié)作的常見技術(shù)。
首先還是載入相應(yīng)的包
from time import perf_counter
 
 import numpy as np
 
 import numba
 from numba import cuda
 
 print(np.__version__)
 print(numba.__version__)
 
 cuda.detect()
 
 # 1.21.6
 # 0.55.2
 
 # Found 1 CUDA devices
 # id 0             b'Tesla T4'                             [SUPPORTED]
 #                       Compute Capability: 7.5
 #                           PCI Device ID: 4
 #                               PCI Bus ID: 0
 #                                     UUID: GPU-bcc6196e-f26e-afdc-1db3-6eba6ff160f0
 #                                 Watchdog: Disabled
 #             FP32/FP64 Performance Ratio: 32
 # Summary:
 # 1/1 devices are supported
 # True
不要忘記,我們這里是CUDA編程,所以NV的GPU是必須的,比如可以去colab或者Kaggle白嫖。
線程間的協(xié)作
簡單的并行歸約算法
我們將從一個非常簡單的問題開始本節(jié):對數(shù)組的所有元素求和。這個算法非常簡單。如果不使用NumPy,我們可以這樣實現(xiàn)它:
 def sum_cpu(array):
    s = 0.0
    for i in range(array.size):
        s += array[i]
    return s
這看起來不是很 Pythonic。但它能夠讓我們了解它正在跟蹤數(shù)組中的所有元素。如果 s 的結(jié)果依賴于數(shù)組的每個元素,我們?nèi)绾尾⑿谢@個算法呢?首先,我們需要重寫算法以允許并行化, 如果有無法并行化的部分則應(yīng)該允許線程相互通信。
到目前為止,我們還沒有學(xué)會如何讓線程相互通信……事實上,我們之前說過不同塊中的線程不通信。我們可以考慮只啟動一個塊,但是我們上次也說了,在大多數(shù) GPU 中塊只能有 1024 個線程!
如何克服這一點?如果將數(shù)組拆分為 1024 個塊(或適當(dāng)數(shù)量的threads_per_block)并分別對每個塊求和呢?然后最后,我們可以將每個塊的總和的結(jié)果相加。下圖顯示了一個非常簡單的 2 塊拆分示例。

上圖就是對數(shù)組元素求和的“分而治之”方法。
如何在 GPU 上做到這一點呢?首先需要將數(shù)組拆分為塊。每個數(shù)組塊將只對應(yīng)一個具有固定數(shù)量的線程的CUDA塊。在每個塊中,每個線程可以對多個數(shù)組元素求和。然后將這些每個線程的值求和,這里就需要線程進(jìn)行通信,我們將在下一個示例中討論如何通信。
由于我們正在對塊進(jìn)行并行化,因此內(nèi)核的輸出應(yīng)該被設(shè)置為一個塊。為了完成Reduce,我們將其復(fù)制到 CPU 并在那里完成工作。
threads_per_block = 1024 # Why not!
 blocks_per_grid = 32 * 80 # Use 32 * multiple of streaming multiprocessors
 
 # Example 2.1: Naive reduction
 @cuda.jit
 def reduce_naive(array, partial_reduction):
    i_start = cuda.grid(1)
    threads_per_grid = cuda.blockDim.x * cuda.gridDim.x
    s_thread = 0.0
    for i_arr in range(i_start, array.size, threads_per_grid):
        s_thread += array[i_arr]
 
    # We need to create a special *shared* array which will be able to be read
    # from and written to by every thread in the block. Each block will have its
    # own shared array. See the warning below!
    s_block = cuda.shared.array((threads_per_block,), numba.float32)
     
    # We now store the local temporary sum of a single the thread into the
    # shared array. Since the shared array is sized
    #     threads_per_block == blockDim.x
    # (1024 in this example), we should index it with `threadIdx.x`.
    tid = cuda.threadIdx.x
    s_block[tid] = s_thread
     
    # The next line synchronizes the threads in a block. It ensures that after
    # that line, all values have been written to `s_block`.
    cuda.syncthreads()
 
    # Finally, we need to sum the values from all threads to yield a single
    # value per block. We only need one thread for this.
    if tid == 0:
        # We store the sum of the elements of the shared array in its first
        # coordinate
        for i in range(1, threads_per_block):
            s_block[0] += s_block[i]
        # Move this partial sum to the output. Only one thread is writing here.
        partial_reduction[cuda.blockIdx.x] = s_block[0]
這里需要注意的是必須共享數(shù)組,并且讓每個數(shù)組塊變得“小”
 這里的“小”:確切大小取決于 GPU 的計算能力,通常在 48 KB 和 163 KB 之間。請參閱此表中的??“每個線程塊的最大共享內(nèi)存量??”項。
在編譯時有一個已知的大?。ㄟ@就是我們調(diào)整共享數(shù)組threads_per_block而不是blockDim.x的原因)。我們總是可以為任何大小的共享數(shù)組定義一個工廠函數(shù)……但要注意這些內(nèi)核的編譯時間。
這里的數(shù)組需要為 Numba 類型指定的 dtype,而不是 Numpy 類型(這個沒有為什么?。?。
 N = 1_000_000_000
 a = np.arange(N, dtype=np.float32)
 a /= a.sum() # a will have sum = 1 (to float32 precision)
 
 s_cpu = a.sum()
 
 # Highly-optimized NumPy CPU code
 timing_cpu = np.empty(21)
 for i in range(timing_cpu.size):
    tic = perf_counter()
    a.sum()
    toc = perf_counter()
    timing_cpu[i] = toc - tic
 timing_cpu *= 1e3 # convert to ms
 
 print(f"Elapsed time CPU: {timing_cpu.mean():.0f} ± {timing_cpu.std():.0f} ms")
 # Elapsed time CPU: 354 ± 24 ms
 
 dev_a = cuda.to_device(a)
 dev_partial_reduction = cuda.device_array((blocks_per_grid,), dtype=a.dtype)
 
 reduce_naive[blocks_per_grid, threads_per_block](dev_a, dev_partial_reduction)
 s = dev_partial_reduction.copy_to_host().sum() # Final reduction in CPU
 
 np.isclose(s, s_cpu) # Ensure we have the right number
 # True
 
 timing_naive = np.empty(21)
 for i in range(timing_naive.size):
    tic = perf_counter()
    reduce_naive[blocks_per_grid, threads_per_block](dev_a, dev_partial_reduction)
    s = dev_partial_reduction.copy_to_host().sum()
    cuda.synchronize()
    toc = perf_counter()
    assert np.isclose(s, s_cpu)    
    timing_naive[i] = toc - tic
 timing_naive *= 1e3 # convert to ms
 
 print(f"Elapsed time naive: {timing_naive.mean():.0f} ± {timing_naive.std():.0f} ms")
 # Elapsed time naive: 30 ± 12 ms
在谷歌Colab上測試,得到了10倍的加速。
題外話:上面這個方法之所以說是簡單的規(guī)約算法,是因為這個算法最簡單,也最容易實現(xiàn)。我們在大數(shù)據(jù)中常見的Map-Reduce算法就是這個算法。雖然實現(xiàn)簡單,但是他容易理解,所以十分常見,當(dāng)然他慢也是出名的,有興趣的大家可以去研究研究。
一種更好的并行歸約算法
上面的算法最 “樸素”的,所以有很多技巧可以加快這種代碼的速度(請參閱 CUDA 演示文稿中的 Optimizing Parallel Reduction 以獲得基準(zhǔn)測試)。
在介紹更好的方法之前,讓我們回顧一下內(nèi)核函數(shù)的的最后一點:
 if tid == 0: # Single thread taking care of business
    for i in range(1, threads_per_block):
        s_block[0] += s_block[i]
    partial_reduction[cuda.blockIdx.x] = s_block[0]
我們并行化了幾乎所有的操作,但是在內(nèi)核的最后,讓一個線程負(fù)責(zé)對共享數(shù)組 s_block 的所有 threads_per_block 元素求和。為什么不能把這個總和也并行化呢?
聽起來不錯對吧,下圖顯示了如何在 threads_per_block 大小為 16 的情況下實現(xiàn)這一點。我們從 8 個線程開始工作,第一個將對 s_block[0] 和 s_block[8] 中的值求和。第二個求和s_block[1]和s_block[9]中的值,直到最后一個線程將s_block[7]和s_block[15]的值相加。
在下一步中,只有前 4 個線程需要工作。第一個線程將對 s_block[0] 和 s_block[4] 求和;第二個,s_block[1] 和 s_block[5];第三個,s_block[2] 和 s_block[6];第四個也是最后一個,s_block[3] 和 s_block[7]。
第三步,只需要 2 個線程來處理 s_block 的前 4 個元素。
第四步也是最后一步將使用一個線程對 2 個元素求和。
由于工作已在線程之間分配,因此它是并行化的。這不是每個線程的平等劃分,但它是一種改進(jìn)。在計算上,這個算法是 O(log2(threads_per_block)),而上面“樸素”算法是 O(threads_per_block)。比如在我們這個示例中是 1024 次操作,用于 了兩個算法差距有10倍
最后還有一個細(xì)節(jié)。在每一步,我們都需要確保所有線程都已寫入共享數(shù)組。所以我們必須調(diào)用 cuda.syncthreads()。

 # Example 2.2: Better reduction
 @cuda.jit
 def reduce_better(array, partial_reduction):
    i_start = cuda.grid(1)
    threads_per_grid = cuda.blockDim.x * cuda.gridDim.x
    s_thread = 0.0
    for i_arr in range(i_start, array.size, threads_per_grid):
        s_thread += array[i_arr]
 
    # We need to create a special *shared* array which will be able to be read
    # from and written to by every thread in the block. Each block will have its
    # own shared array. See the warning below!
    s_block = cuda.shared.array((threads_per_block,), numba.float32)
     
    # We now store the local temporary sum of the thread into the shared array.
    # Since the shared array is sized threads_per_block == blockDim.x,
    # we should index it with `threadIdx.x`.
    tid = cuda.threadIdx.x
    s_block[tid] = s_thread
     
    # The next line synchronizes the threads in a block. It ensures that after
    # that line, all values have been written to `s_block`.
    cuda.syncthreads()
 
    i = cuda.blockDim.x // 2
    while (i > 0):
        if (tid < i):
            s_block[tid] += s_block[tid + i]
        cuda.syncthreads()
        i //= 2
 
    if tid == 0:
        partial_reduction[cuda.blockIdx.x] = s_block[0]
 
 
 reduce_better[blocks_per_grid, threads_per_block](dev_a, dev_partial_reduction)
 s = dev_partial_reduction.copy_to_host().sum() # Final reduction in CPU
 
 np.isclose(s, s_cpu)
 # True
 
 timing_naive = np.empty(21)
 for i in range(timing_naive.size):
    tic = perf_counter()
    reduce_better[blocks_per_grid, threads_per_block](dev_a, dev_partial_reduction)
    s = dev_partial_reduction.copy_to_host().sum()
    cuda.synchronize()
    toc = perf_counter()
    assert np.isclose(s, s_cpu)    
    timing_naive[i] = toc - tic
 timing_naive *= 1e3 # convert to ms
 
 print(f"Elapsed time better: {timing_naive.mean():.0f} ± {timing_naive.std():.0f} ms")
 # Elapsed time better: 22 ± 1 ms
可以看到,這比原始方法快25%。
重要說明:你可能很想將同步線程移動到 if 塊內(nèi),因為在每一步之后,超過當(dāng)前線程數(shù)一半的內(nèi)核將不會被使用。但是這樣做會使調(diào)用同步線程的 CUDA 線程停止并等待所有其他線程,而所有其他線程將繼續(xù)運行。因此停止的線程將永遠(yuǎn)等待永遠(yuǎn)不會停止同步的線程。如果您同步線程,請確保在所有線程中調(diào)用 cuda.syncthreads()。
 i = cuda.blockDim.x // 2
 while (i > 0):
    if (tid < i):
        s_block[tid] += s_block[tid + i]
        #cuda.syncthreads() 這個是錯的
    cuda.syncthreads() # 這個是對的
    i //= 2
Numba 自動歸約
其實歸約算法并不簡單,所以Numba 提供了一個方便的 cuda.reduce 裝飾器,可以將二進(jìn)制函數(shù)轉(zhuǎn)換為歸約。所以上面冗長而復(fù)雜的算法可以替換為:
@cuda.reduce
 def reduce_numba(a, b):
    return a + b
 
 # Compile and check
 s = reduce_numba(dev_a)
 
 np.isclose(s, s_cpu)
 # True
 
 # Time
 timing_numba = np.empty(21)
 for i in range(timing_numba.size):
    tic = perf_counter()
    s = reduce_numba(dev_a)
    toc = perf_counter()
    assert np.isclose(s, s_cpu)    
    timing_numba[i] = toc - tic
 timing_numba *= 1e3 # convert to ms
 
 print(f"Elapsed time better: {timing_numba.mean():.0f} ± {timing_numba.std():.0f} ms")
 # Elapsed time better: 45 ± 0 ms
上面的運行結(jié)果我們可以看到手寫代碼通常要快得多(至少 2 倍),但 Numba 給我們提供的方法卻非常容易使用。這對我們來說是格好事,因為終于有我們自己實現(xiàn)的Python方法比官方的要快了??
這里還有一點要注意就是默認(rèn)情況下,要減少復(fù)制因為這會強制同步。為避免這種情況可以使用設(shè)備上數(shù)組作為輸出調(diào)用歸約:
dev_s = cuda.device_array((1,), dtype=s)
 
 reduce_numba(dev_a, res=dev_s)
 
 s = dev_s.copy_to_host()[0]
 np.isclose(s, s_cpu)
 # True
二維規(guī)約示例
并行約簡技術(shù)是非常偉大的,如何將其擴展到更高的維度?雖然我們總是可以使用一個展開的數(shù)組(array2 .ravel())調(diào)用,但了解如何手動約簡多維數(shù)組是很重要的。
在下面這個例子中,將結(jié)合剛才所學(xué)的知識來計算二維數(shù)組。
 threads_per_block_2d = (16, 16) # 256 threads total
 blocks_per_grid_2d = (64, 64)
 
 # Total number of threads in a 2D block (has to be an int)
 shared_array_len = int(np.prod(threads_per_block_2d))
 
 # Example 2.4: 2D reduction with 1D shared array
 @cuda.jit
 def reduce2d(array2d, partial_reduction2d):
    ix, iy = cuda.grid(2)
    threads_per_grid_x, threads_per_grid_y = cuda.gridsize(2)
 
    s_thread = 0.0
    for i0 in range(iy, array2d.shape[0], threads_per_grid_x):
        for i1 in range(ix, array2d.shape[1], threads_per_grid_y):
            s_thread += array2d[i0, i1]
 
    # Allocate shared array
    s_block = cuda.shared.array(shared_array_len, numba.float32)
 
    # Index the threads linearly: each tid identifies a unique thread in the
    # 2D grid.
    tid = cuda.threadIdx.x + cuda.blockDim.x * cuda.threadIdx.y
    s_block[tid] = s_thread
 
    cuda.syncthreads()
 
    # We can use the same smart reduction algorithm by remembering that
    #     shared_array_len == blockDim.x * cuda.blockDim.y
    # So we just need to start our indexing accordingly.
    i = (cuda.blockDim.x * cuda.blockDim.y) // 2
    while (i != 0):
        if (tid < i):
            s_block[tid] += s_block[tid + i]
        cuda.syncthreads()
        i //= 2
     
    # Store reduction in a 2D array the same size as the 2D blocks
    if tid == 0:
        partial_reduction2d[cuda.blockIdx.x, cuda.blockIdx.y] = s_block[0]
 
 
 N_2D = (20_000, 20_000)
 a_2d = np.arange(np.prod(N_2D), dtype=np.float32).reshape(N_2D)
 a_2d /= a_2d.sum() # a_2d will have sum = 1 (to float32 precision)
 
 s_2d_cpu = a_2d.sum()
 
 dev_a_2d = cuda.to_device(a_2d)
 dev_partial_reduction_2d = cuda.device_array(blocks_per_grid_2d, dtype=a.dtype)
 
 reduce2d[blocks_per_grid_2d, threads_per_block_2d](dev_a_2d, dev_partial_reduction_2d)
 s_2d = dev_partial_reduction_2d.copy_to_host().sum() # Final reduction in CPU
 
 np.isclose(s_2d, s_2d_cpu) # Ensure we have the right number
 # True
 
 timing_2d = np.empty(21)
 for i in range(timing_2d.size):
    tic = perf_counter()
    reduce2d[blocks_per_grid_2d, threads_per_block_2d](dev_a_2d, dev_partial_reduction_2d)
    s_2d = dev_partial_reduction_2d.copy_to_host().sum()
    cuda.synchronize()
    toc = perf_counter()
    assert np.isclose(s_2d, s_2d_cpu)    
    timing_2d[i] = toc - tic
 timing_2d *= 1e3 # convert to ms
 
 print(f"Elapsed time better: {timing_2d.mean():.0f} ± {timing_2d.std():.0f} ms")
 # Elapsed time better: 15 ± 4 ms
設(shè)備函數(shù)
到目前為止,我們只討論了內(nèi)核函數(shù),它是啟動線程的特殊GPU函數(shù)。內(nèi)核通常依賴于較小的函數(shù),這些函數(shù)在GPU中定義,只能訪問GPU數(shù)組。這些被稱為設(shè)備函數(shù)(Device functions)。與內(nèi)核函數(shù)不同的是,它們可以返回值。
我們將展示一個跨不同內(nèi)核使用設(shè)備函數(shù)的示例。該示例還將展示在使用共享數(shù)組時同步線程的重要性。
在CUDA的新版本中,內(nèi)核可以啟動其他內(nèi)核。這被稱為動態(tài)并行,但是Numba 的CUDA API還不支持。
我們將在固定大小的數(shù)組中創(chuàng)建波紋圖案。首先需要聲明將使用的線程數(shù),因為這是共享數(shù)組所需要的。
 threads_16 = 16
 
 import math
 
 @cuda.jit(device=True, inline=True) # inlining can speed up execution
 def amplitude(ix, iy):
    return (1 + math.sin(2 * math.pi * (ix - 64) / 256)) * (
        1 + math.sin(2 * math.pi * (iy - 64) / 256)
    )
 
 # Example 2.5a: 2D Shared Array
 @cuda.jit
 def blobs_2d(array2d):
    ix, iy = cuda.grid(2)
    tix, tiy = cuda.threadIdx.x, cuda.threadIdx.y
 
    shared = cuda.shared.array((threads_16, threads_16), numba.float32)
    shared[tiy, tix] = amplitude(iy, ix)
    cuda.syncthreads()
 
    array2d[iy, ix] = shared[15 - tiy, 15 - tix]
 
 # Example 2.5b: 2D Shared Array without synchronize
 @cuda.jit
 def blobs_2d_wrong(array2d):
    ix, iy = cuda.grid(2)
    tix, tiy = cuda.threadIdx.x, cuda.threadIdx.y
 
    shared = cuda.shared.array((threads_16, threads_16), numba.float32)
    shared[tiy, tix] = amplitude(iy, ix)
 
    # When we don't sync threads, we may have not written to shared
    # yet, or even have overwritten it by the time we write to array2d
    array2d[iy, ix] = shared[15 - tiy, 15 - tix]
 
 
 N_img = 1024
 blocks = (N_img // threads_16, N_img // threads_16)
 threads = (threads_16, threads_16)
 
 dev_image = cuda.device_array((N_img, N_img), dtype=np.float32)
 dev_image_wrong = cuda.device_array((N_img, N_img), dtype=np.float32)
 
 blobs_2d[blocks, threads](dev_image)
 blobs_2d_wrong[blocks, threads](dev_image_wrong)
 
 image = dev_image.copy_to_host()
 image_wrong = dev_image_wrong.copy_to_host()
 
 import matplotlib.pyplot as plt
 
 fig, (ax1, ax2) = plt.subplots(1, 2)
 ax1.imshow(image.T, cmap="nipy_spectral")
 ax2.imshow(image_wrong.T, cmap="nipy_spectral")
 for ax in (ax1, ax2):
    ax.set_xticks([])
    ax.set_yticks([])
    ax.set_xticklabels([])
    ax.set_yticklabels([])

左:同步(正確)內(nèi)核的結(jié)果。正確:來自不同步(不正確)內(nèi)核的結(jié)果。
總結(jié)
本文介紹了如何開發(fā)需要規(guī)約模式來處理1D和2D數(shù)組的內(nèi)核函數(shù)。在這個過程中,我們學(xué)習(xí)了如何利用共享數(shù)組和設(shè)備函數(shù)。如果你對文本感興趣,請看源代碼:
https://colab.research.google.com/drive/1GkGLDexnYUnl2ilmwNxAlWAH6Eo5ZK2f?usp=sharing