雙核CPU上的快速排序效率
為了試驗(yàn)一下多核CPU上排序算法的效率,得比較單任務(wù)情況下和多任務(wù)并行排序算法的差距,因此選用快速排序算法來進(jìn)行比較。
測(cè)試環(huán)境:雙核CPU 2.66GHZ
單核CPU 2.4GHZ
以下是一個(gè)快速排序算法的源代碼:
- UINTSplit(void **ppData, UINTuStart, UINTuEnd,
 - COMPAREFUNCCompareFunc)
 - {
 - void *pSelData;
 - UINTuLow;
 - UINTuHigh;
 - uLow = uStart;
 - uHigh = uEnd;
 - pSelData = ppData[uLow];
 - while ( uLow < uHigh )
 - {
 - while ( (*CompareFunc)(ppData[uHigh], pSelData) > 0
 - && uLow != uHigh )
 - {
 - --uHigh;
 - }
 - if ( uHigh != uLow )
 - {
 - ppData[uLow] = ppData[uHigh];
 - ++uLow;
 - }
 - while ( (*CompareFunc)( ppData[uLow], pSelData ) < 0
 - && uLow != uHigh )
 - {
 - ++uLow;
 - }
 - if ( uLow != uHigh )
 - {
 - ppData[uHigh] = ppData[uLow];
 - --uHigh;
 - }
 - }
 - ppData[uLow] = pSelData;
 - returnuLow;
 - }
 - voidQuickSort(void **ppData, UINTuStart, UINTuEnd,
 - COMPAREFUNCCompareFunc)
 - {
 - UINTuMid = Split(ppData, uStart, uEnd, CompareFunc );
 - if ( uMid > uStart )
 - {
 - QuickSort(ppData, uStart, uMid - 1, CompareFunc);
 - }
 - if ( uEnd > uMid )
 - {
 - QuickSort(ppData, uMid + 1, uEnd, CompareFunc);
 - }
 - }
 
先測(cè)試一下這個(gè)快速排序算法排一百萬個(gè)隨機(jī)整數(shù)所花的時(shí)間:
- voidTest_QuickSort(void)
 - {
 - UINTi;
 - UINTuCount = 1000000; //1000000個(gè)
 - srand(time(NULL));
 - void **pp = (void **)malloc(uCount * sizeof(void *));
 - for ( i = 0; i < uCount; i++ )
 - {
 - pp[i] = (void *)(rand() % uCount);
 - }
 - clockclock_tt1 = clock();
 - QuickSort(pp, 0, uCount-1, UIntCompare);
 - clockclock_tt2 = clock();
 - printf("QuickSort 1000000 Time %ld/n", t2-t1);
 - free(pp);
 - }
 
在雙核CPU2.66GHZ機(jī)器上運(yùn)行測(cè)試程序,打印出花費(fèi)的時(shí)間約為406 ms
在單核CPU2.4GHZ機(jī)器上運(yùn)行測(cè)試程序,打印出花費(fèi)時(shí)間約為484ms
可見在雙核CPU上運(yùn)行單任務(wù)程序和單核CPU基本是一樣的,效率沒有任何提高。
下面再來把上面的快速排序程序變成并行的,一個(gè)簡(jiǎn)單的方法就是將要排序的區(qū)間分成相同的幾個(gè)段,然后對(duì)每個(gè)段進(jìn)行快速排序,排序完后再使用歸并算法將排好的幾個(gè)區(qū)間歸并成一個(gè)排好序的表,我們先四個(gè)線程來進(jìn)行排序,代碼如下:
- void ** Merge(void **ppData, UINTuStart, UINTuEnd,
 - void **ppData2, UINTuStart2, UINTuEnd2, COMPAREFUNCcfunc)
 - {
 - UINTi, j, k;
 - UINTu1, u2, v1,v2;
 - void **pp1;
 - void **pp2;
 - void **pp = (void **)malloc( (uEnd-uStart+1+uEnd2-uStart2+1) * sizeof(void *));
 - if ( pp == NULL )
 - {
 - returnNULL;
 - }
 - if ( (*cfunc)(ppData2[uStart2], ppData[uStart]) > 0 )
 - {
 - u1 = uStart;
 - u2 = uEnd;
 - v1 = uStart2;
 - v2 = uEnd2;
 - pp1 = ppData;
 - pp2 = ppData2;
 - }
 - else
 - {
 - u1 = uStart2;
 - u2 = uEnd2;
 - v1 = uStart;
 - v2 = uEnd;
 - pp1 = ppData2;
 - pp2 = ppData;
 - }
 - k = 0;
 - pp[k] = pp1[u1];
 - j = v1;
 - for (i = u1+1; i <= u2; i++ )
 - {
 - while ( j <= v2 )
 - {
 - if ( (*cfunc)(pp2[j], pp1[i]) < 0 )
 - {
 - ++k;
 - pp[k] = pp2[j];
 - j++;
 - }
 - else
 - {
 - break;
 - }
 - }
 - ++k;
 - pp[k] = pp1[i];
 - }
 - if ( j < v2 )
 - {
 - for ( i = j; i <= v2; i++)
 - {
 - ++k;
 - pp[k] = pp2[i];
 - }
 - }
 - returnpp;
 - }
 - typedefstructSORTNODE_st {
 - void ** ppData;
 - UINT uStart;
 - UINT uEnd;
 - COMPAREFUNCfunc;
 - } SORTNODE;
 - DWORDWINAPIQuickSort_Thread(void *arg)
 - {
 - SORTNODE *pNode = (SORTNODE *)arg;
 - QuickSort(pNode->ppData, pNode->uStart, pNode->uEnd, pNode->func);
 - return 1;
 - }
 - #define THREAD_COUNT 4
 - INTMQuickSort(void **ppData, UINTuStart, UINTuEnd,
 - COMPAREFUNCCompareFunc)
 - {
 - void **pp1;
 - void **pp2;
 - void **pp3;
 - INT i;
 - SORTNODE Node[THREAD_COUNT];
 - HANDLE hThread[THREAD_COUNT];
 - INT nRet = CAPI_FAILED;
 - for ( i = 0; i < THREAD_COUNT; i++)
 - {
 - Node[i].ppData = ppData;
 - if ( i == 0 )
 - {
 - Node[i].uStart = uStart;
 - }
 - else
 - {
 - Node[i].uStart = uEnd * i /THREAD_COUNT + 1;
 - }
 - Node[i].uEnd = uEnd *(i+1) / THREAD_COUNT;
 - Node[i].func = CompareFunc;
 - hThread[i] = CreateThread(NULL, 0, QuickSort_Thread, &(Node[i]), 0, NULL);
 - }
 - for ( i = 0; i < THREAD_COUNT; i++ )
 - {
 - WaitForSingleObject(hThread[i], INFINITE);
 - }
 - pp1 = Merge(ppData, uStart, uEnd/4, ppData, uEnd/4+1, uEnd/2, CompareFunc);
 - pp2 = Merge(ppData, uEnd/2+1, uEnd*3/4, ppData, uEnd*3/4+1, uEnd, CompareFunc);
 - if ( pp1 != NULL && pp2 != NULL )
 - {
 - pp3 = Merge(pp1, 0, uEnd/2-uStart, pp2, 0, uEnd - uEnd/2 - 1, CompareFunc);
 - if ( pp3 != NULL )
 - {
 - UINTi;
 - for ( i = uStart; i <= uEnd; i++)
 - {
 - ppData[i] = pp3[i-uStart];
 - }
 - free(pp3);
 - nRet = CAPI_SUCCESS;
 - }
 - }
 - if( pp1 != NULL)
 - {
 - free( pp1 );
 - }
 - if ( pp2 != NULL )
 - {
 - free( pp2 );
 - }
 - returnnRet;
 - }
 
#p#
用下面程序來測(cè)試一下排1百萬個(gè)隨機(jī)整數(shù)的花費(fèi)時(shí)間:
- voidTest_MQuickSort (void)
 - {
 - UINTi;
 - UINTuCount = 1000000; //1000個(gè)
 - srand(time(NULL));
 - void **pp = (void **)malloc(uCount * sizeof(void *));
 - for ( i = 0; i < uCount; i++ )
 - {
 - pp[i] = (void *)(rand() % uCount);
 - }
 - clockclock_tt1 = clock();
 - INTnRet = MQuickSort(pp, 0, uCount-1, UIntCompare);
 - clockclock_tt2 = clock();
 - printf("MQuickSort 1000000 Time %ld/n", t2-t1);
 - free(pp);
 - }
 
在雙核CPU上運(yùn)行后,打印出花費(fèi)的時(shí)間為 234 ms , 單任務(wù)版的快速排序函數(shù)約需406ms左右,并行運(yùn)行效率為:406/(2×234) = 86.7% 左右。運(yùn)行速度快了172ms。
可見雙核CPU中,多任務(wù)程序速度還是有很大提高的。
當(dāng)然上面的多任務(wù)版的快速排序程序還有很大的改進(jìn)余地,當(dāng)對(duì)4個(gè)區(qū)間排好序后,后面的歸并操作都是在一個(gè)任務(wù)里運(yùn)行的,對(duì)整體效率會(huì)產(chǎn)生影響。估計(jì)將程序繼續(xù)優(yōu)化后,速度還能再快一些。
原文鏈接:http://blog.csdn.net/drzhouweiming/article/details/1109499















 
 
 

 
 
 
 