Windows 異步IO的幾種實現方式


Windows上的異步IO有好幾種實現方式。


設備內核對象

這是最簡單的一種了,直接用設備內核對象的狀態。比如文件句柄,線程句柄等等,這些內核對象都是有一個觸發狀態的,比如當一個線程結束后,線程內核對象就會被觸發。

對於文件內核對象,如果一個異步IO完成了,就會把文件句柄設置為觸發狀態,但是有個問題就是:如果有多個異步io,那么只要有一個異步io完成了,文件句柄就會被設置為觸發狀態。這樣,就不能應用於多個異步io的情況,因為根本不知道是哪個異步io完成了。

代碼例子:

/* 設備內核對象

通過設備內核對象來得到異步IO完成通知。
使用很簡單。
缺點:如果對一個設備發起了多個io請求,那么這個辦法就不行了。
這是因為,多個異步io請求里面任何一個完成了,都會把設備內核對象的狀態
設置成觸發。這樣當WaitForSingleObject返回的時候就無法知道是什么io完成了。
*/
void DeviceObjIO()
{
HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_WRITE, 0, 0, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED, 0);//創建一個文件,設置成異步IO,FILE_FLAG_OVERLAPPED
BYTE buffer[10] = {'a', 'b', 'c', 'd'};
OVERLAPPED ol = {0};//初始化OVERLAPPED的結構
ol.Offset = 2;//表示從第三個字節開始寫

BOOL rt = WriteFile(hFile, buffer, 5, NULL, &ol);//發起一個異步寫操作

//這里可以做其他事情了,因為WriteFile是異步了,會馬上返回。如果是同步的WriteFile,那么假如WriteFile需要10秒鍾,WriteFile在10秒內是不會返回的。

// SetFileCompletionNotificationModes(hFile, FILE_SKIP_SET_EVENT_ON_HANDLE);//如果設置了這個標記,那么文件內核對象就不會被觸發了。

if (rt == FALSE && GetLastError() == ERROR_IO_PENDING)//檢查異步IO是否完成了,可以在其他線程檢查。這里只是一個演示。
{
WaitForSingleObject(hFile, INFINITE);//等待設備內核對象(文件)被觸發。
}

CloseHandle(hFile);
}

事件內核對象

事件內核對象比設備內核對象好一點,可以支持多個異步io。每個read或者write里面的overlapped的結構,我們可以設置一個事件內核對象,這樣每次io的事件內核對象是不一樣的,就可以支持多個異步io了。但是WaitForMultipleObject也有個限制,就是一次等待的事件內核對象最多只能有64個,當然也可以變通一下,比如把所有的事件內核對象分組等。

/* 事件內核對象

通過一個事件內核對象來獲取異步IO完成的通知。
這個比較設備內核對象好一些,可以支持多個異步IO,因為程序可以通過每個異步IO的OVERLAPPED結構里面的事件內核對象來判斷
異步IO是否完成了。
*/
void EventObjIO()
{
HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_READ, 0, 0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);//打開前面創建的文件
BYTE buffer[10] = {0};
OVERLAPPED ol = {0};
ol.Offset = 0;
HANDLE hEvent = CreateEvent(0, FALSE, FALSE, NULL);
ol.hEvent = hEvent;//傳遞一個事件對象。

BOOL rt = ReadFile(hFile, buffer, 7, NULL, &ol);//提交一個異步讀操作

// DWORD read = 0;
// GetOverlappedResult(hFile, &ol, &read, TRUE);//也可以使用這個函數來等待執行結果。GetOverlappedResult內部會調用WaitForSingleObject來檢查異步IO是否完成了。

if (rt == FALSE && GetLastError() == ERROR_IO_PENDING)
{
WaitForSingleObject(ol.hEvent, INFINITE);//等待事件對象被觸發。
}

CloseHandle(hFile);
CloseHandle(hEvent);
}

APC 隊列

每個線程都有一個APC隊列。曾經微軟公司大力推廣這個技術,但是有個最大的問題,就是回調函數只能在調用線程里面被調用。如果調用線程忙,那么就得不到callback的執行。個人從來沒有使用過這種方式。

/* 可提醒IO

通過線程的APC隊列
當系統創建一個線程的時候,同時會創建一個線程APC隊列
APC: Asynchronous Procedure Call
在主調線程里面需要通過可提醒等待函數來等待回調函數的執行。
最大的缺點就是:回調函數將會在主調線程里面。如果主調線程在做其他事情,
就算異步IO已經完成了,而且驅動已經把完成的IO添加到線程的APC隊列,線程也沒有辦法來執行回調函數。
*/
VOID
WINAPI myCallback(
__in DWORD dwErrorCode,
__in DWORD dwNumberOfBytesTransfered,
__inout LPOVERLAPPED lpOverlapped
)
{
printf("transfered: %d\n", dwNumberOfBytesTransfered);
}

void APCIO()
{
HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_WRITE, 0, 0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);//打開前面創建的文件
BYTE buffer[10] = {'f', 'f', 'f'};
OVERLAPPED ol = {0};
ol.Offset = 5;

BOOL rt = WriteFileEx(hFile, buffer, 3, &ol, myCallback);//跟前面的WriteFile不一樣,這里會返回TRUE.

//再寫一次
BYTE buffer2[10] = {'g', 'g', 'g', 'g'};
OVERLAPPED ol2 = {0};
ol2.Offset = 8;

rt = WriteFileEx(hFile, buffer2, 4, &ol2, myCallback);

// Sleep(5000);//在這5秒內,就算異步IO完成了,回調函數也不會被執行。因為回調函數只能在當前線程中執行,但是這個線程在做其他事情:睡覺。

DWORD dwError = SleepEx(10000, TRUE);//如果APC被成功調用的話,那么這個函數會返回,同時返回值是WAIT_IO_COMPLETION.SleepEx跟Sleep的區別是SleepEx是可提醒的,如果
//當前線程的APC隊列里面有東西了,那么線程將被喚醒,並且執行回調函數。回調函數結束后,SleepEx也將返回。

if(dwError == WAIT_IO_COMPLETION)
{
printf("APC successfully\n");
}

CloseHandle(hFile);

}


完成端口

異步io里面最高效的一種方式了。

Windows上服務器模型最高效的就是socket + 完成端口了。這里的例子是文件+完成端口。

把文件句柄和完成端口相綁定,然后發起異步io,這樣當有異步io完成的時候,這個通知就會把發送到完成端口內核對象的IO完成隊列,然后完成端口會找個線程池里面找到一個等待的線程並且喚醒。

線程池里面的線程有程序員創建,一般創建CPU * 2個線程。每個創建的線程調用GetQueuedCompletionStatus(), 這個線程就會被放到完成端口的“等待線程隊列”里面。

下面是個簡單的例子。

/* 完成端口

*/


class MyOverlapped: public OVERLAPPED
{
public:
MyOverlapped()
{
memset(this, 0, sizeof(MyOverlapped));

m_buf = new BYTE[10];
memset(m_buf, 0, 10);
}

~MyOverlapped()
{
if(m_buf)
{
delete m_buf;
m_buf = NULL;
}
}

BYTE* GetByte() const{return m_buf;}

protected:
BYTE* m_buf;
};

void WorkThread(void* pv)
{
HANDLE hCP = (HANDLE)pv;

while (TRUE)
{
DWORD dwTransfer = 0;
MyOverlapped* ol = NULL;

ULONG_PTR key = 0;
//查詢完成端口上的完成隊列,看看是否有已經完成的異步IO.
BOOL rt = GetQueuedCompletionStatus(hCP, &dwTransfer, &key, (LPOVERLAPPED*)&ol, INFINITE);//第三個參數不可以傳0,不然會得到一個998的錯誤
DWORD err = GetLastError();
if (!rt)
{
break;
}

printf("IOCP recv: %d bytes, thread id: %d\n", dwTransfer, GetCurrentThreadId());
}
}

void IOCP()
{
//完成端口是一個內核對象,但是這個創建函數為什么沒有安全參數呢?
//據說所有的內核對象里面,就是完成端口是個例外。根據書上說的,可能是因為完成端口的設計初衷就是在一個進程里面使用。
HANDLE hCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);//創建一個完成端口對象,最后一個參數,表示使用默認的並發線程數,CPU的數量。

HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_ALL, 0, 0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);//打開一個設備

//將這個設備關聯到完成端口,這樣當設備上的異步IO完成的時候,就會通知完成端口的完成隊列。
HANDLE h = CreateIoCompletionPort(hFile, hCP, NULL, 0);

assert(h == hCP);//返回值肯定和之前已經創建的完成端口對象一樣

MyOverlapped ol;//使用一個自定義的OVERLAPPED
ol.Offset = 4;

BOOL rt = ReadFile(hFile, ol.GetByte(), 4, NULL, &ol);//發起一個異步IO

MyOverlapped ol2;
ol2.Offset = 5;

rt = ReadFile(hFile, ol2.GetByte(), 5, NULL, &ol2);//再發起一個異步IO

DWORD dwErr = GetLastError();

//一般會創建CPU個數 * 2 個處理線程。
_beginthread(WorkThread, 0, hCP);//啟動一個處理線程。
_beginthread(WorkThread, 0, hCP);//再啟動一個處理線程。

Sleep(2000);

CloseHandle(hFile);
CloseHandle(hCP);
}


線程池 + 完成端口

上面的例子里面,線程是自己創建的,需要自己來維護線程的創建和銷毀。Windows提供了線程池來和完成端口相配合,這樣省去了創建和銷毀線程的麻煩。

Windows的線程池有很多功能,比如和timer一起使用,和完成端口一起使用。

CreateThreadPoolIo會創建一個完成端口,把想要綁定的io對象,如文件句柄傳給它,這樣線程池里面的完成端口對象就和文件句柄綁定了。

簡單例子如下:

/*線程池 + 完成端口
*/

VOID CALLBACK OverlappedCompletionRoutine(PTP_CALLBACK_INSTANCE pInstance,
PVOID pvContext,
PVOID pOverlapped,
ULONG IoResult,
ULONG_PTR NumberOfBytesTransferred,
PTP_IO pIo)
{
printf("OverlappedCompletionRoutine, transferred: %d bytes\n", NumberOfBytesTransferred);
}



void IOCP_ThreadPool()
{
HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_ALL, 0, 0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);//打開一個設備

PTP_IO pio = CreateThreadpoolIo(hFile, OverlappedCompletionRoutine, NULL, NULL);//將設備對象和線程池的IO完成端口關聯起來。
DWORD err = GetLastError();


MyOverlapped ol;//使用一個自定義的OVERLAPPED
ol.Offset = 4;

StartThreadpoolIo(pio);//每次發起一個異步io請求的時候,都要調一下這個函數,不然在CloseThreadpoolIo()的時候會出異常。
BOOL rt = ReadFile(hFile, ol.GetByte(), 4, NULL, &ol);//發起一個異步IO


MyOverlapped ol2;
ol2.Offset = 5;

StartThreadpoolIo(pio);
rt = ReadFile(hFile, ol2.GetByte(), 5, NULL, &ol2);//再發起一個異步IO


Sleep(4000);

CloseHandle(hFile);//關閉文件對象
CloseThreadpoolIo(pio);//關閉線程池io完成對象
}


所有異步io里面,個人用的最多的其實是完成端口 + 自己創建線程的方式了。當然每種方式都有自己的優缺點,具體使用哪種還是需要看應用場景,沒有最好的方式,只有合適的方式。


附:

完整測試例子

// AsyncIO.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <Windows.h>
#include <process.h>
#include <assert.h>

/* 設備內核對象

通過設備內核對象來得到異步IO完成通知。
使用很簡單。
缺點:如果對一個設備發起了多個io請求,那么這個辦法就不行了。
這是因為,多個異步io請求里面任何一個完成了,都會把設備內核對象的狀態
設置成觸發。這樣當WaitForSingleObject返回的時候就無法知道是什么io完成了。
*/
void DeviceObjIO()
{
HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_WRITE, 0, 0, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED, 0);//創建一個文件,設置成異步IO,FILE_FLAG_OVERLAPPED
BYTE buffer[10] = {'a', 'b', 'c', 'd'};
OVERLAPPED ol = {0};//初始化OVERLAPPED的結構
ol.Offset = 2;//表示從第三個字節開始寫

BOOL rt = WriteFile(hFile, buffer, 5, NULL, &ol);//發起一個異步寫操作

//這里可以做其他事情了,因為WriteFile是異步了,會馬上返回。如果是同步的WriteFile,那么假如WriteFile需要10秒鍾,WriteFile在10秒內是不會返回的。

// SetFileCompletionNotificationModes(hFile, FILE_SKIP_SET_EVENT_ON_HANDLE);//如果設置了這個標記,那么文件內核對象就不會被觸發了。

if (rt == FALSE && GetLastError() == ERROR_IO_PENDING)//檢查異步IO是否完成了,可以在其他線程檢查。這里只是一個演示。
{
WaitForSingleObject(hFile, INFINITE);//等待設備內核對象(文件)被觸發。
}

CloseHandle(hFile);
}

/* 事件內核對象

通過一個事件內核對象來獲取異步IO完成的通知。
這個比較設備內核對象好一些,可以支持多個異步IO,因為程序可以通過每個異步IO的OVERLAPPED結構里面的事件內核對象來判斷
異步IO是否完成了。
*/
void EventObjIO()
{
HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_READ, 0, 0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);//打開前面創建的文件
BYTE buffer[10] = {0};
OVERLAPPED ol = {0};
ol.Offset = 0;
HANDLE hEvent = CreateEvent(0, FALSE, FALSE, NULL);
ol.hEvent = hEvent;//傳遞一個事件對象。

BOOL rt = ReadFile(hFile, buffer, 7, NULL, &ol);//提交一個異步讀操作

// DWORD read = 0;
// GetOverlappedResult(hFile, &ol, &read, TRUE);//也可以使用這個函數來等待執行結果。GetOverlappedResult內部會調用WaitForSingleObject來檢查異步IO是否完成了。

if (rt == FALSE && GetLastError() == ERROR_IO_PENDING)
{
WaitForSingleObject(ol.hEvent, INFINITE);//等待事件對象被觸發。
}

CloseHandle(hFile);
CloseHandle(hEvent);
}

/* 可提醒IO

通過線程的APC隊列
當系統創建一個線程的時候,同時會創建一個線程APC隊列
APC: Asynchronous Procedure Call
在主調線程里面需要通過可提醒等待函數來等待回調函數的執行。
最大的缺點就是:回調函數將會在主調線程里面。如果主調線程在做其他事情,
就算異步IO已經完成了,而且驅動已經把完成的IO添加到線程的APC隊列,線程也沒有辦法來執行回調函數。
*/
VOID
WINAPI myCallback(
__in DWORD dwErrorCode,
__in DWORD dwNumberOfBytesTransfered,
__inout LPOVERLAPPED lpOverlapped
)
{
printf("transfered: %d\n", dwNumberOfBytesTransfered);
}

void APCIO()
{
HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_WRITE, 0, 0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);//打開前面創建的文件
BYTE buffer[10] = {'f', 'f', 'f'};
OVERLAPPED ol = {0};
ol.Offset = 5;

BOOL rt = WriteFileEx(hFile, buffer, 3, &ol, myCallback);//跟前面的WriteFile不一樣,這里會返回TRUE.

//再寫一次
BYTE buffer2[10] = {'g', 'g', 'g', 'g'};
OVERLAPPED ol2 = {0};
ol2.Offset = 8;

rt = WriteFileEx(hFile, buffer2, 4, &ol2, myCallback);

// Sleep(5000);//在這5秒內,就算異步IO完成了,回調函數也不會被執行。因為回調函數只能在當前線程中執行,但是這個線程在做其他事情:睡覺。

DWORD dwError = SleepEx(10000, TRUE);//如果APC被成功調用的話,那么這個函數會返回,同時返回值是WAIT_IO_COMPLETION.SleepEx跟Sleep的區別是SleepEx是可提醒的,如果
//當前線程的APC隊列里面有東西了,那么線程將被喚醒,並且執行回調函數。回調函數結束后,SleepEx也將返回。

if(dwError == WAIT_IO_COMPLETION)
{
printf("APC successfully\n");
}

CloseHandle(hFile);

}

/* 完成端口

*/


class MyOverlapped: public OVERLAPPED
{
public:
MyOverlapped()
{
memset(this, 0, sizeof(MyOverlapped));

m_buf = new BYTE[10];
memset(m_buf, 0, 10);
}

~MyOverlapped()
{
if(m_buf)
{
delete m_buf;
m_buf = NULL;
}
}

BYTE* GetByte() const{return m_buf;}

protected:
BYTE* m_buf;
};

void WorkThread(void* pv)
{
HANDLE hCP = (HANDLE)pv;

while (TRUE)
{
DWORD dwTransfer = 0;
MyOverlapped* ol = NULL;

ULONG_PTR key = 0;
//查詢完成端口上的完成隊列,看看是否有已經完成的異步IO.
BOOL rt = GetQueuedCompletionStatus(hCP, &dwTransfer, &key, (LPOVERLAPPED*)&ol, INFINITE);//第三個參數不可以傳0,不然會得到一個998的錯誤
DWORD err = GetLastError();
if (!rt)
{
break;
}

printf("IOCP recv: %d bytes, thread id: %d\n", dwTransfer, GetCurrentThreadId());
}
}

void IOCP()
{
//完成端口是一個內核對象,但是這個創建函數為什么沒有安全參數呢?
//據說所有的內核對象里面,就是完成端口是個例外。根據書上說的,可能是因為完成端口的設計初衷就是在一個進程里面使用。
HANDLE hCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);//創建一個完成端口對象,最后一個參數,表示使用默認的並發線程數,CPU的數量。

HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_ALL, 0, 0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);//打開一個設備

//將這個設備關聯到完成端口,這樣當設備上的異步IO完成的時候,就會通知完成端口的完成隊列。
HANDLE h = CreateIoCompletionPort(hFile, hCP, NULL, 0);

assert(h == hCP);//返回值肯定和之前已經創建的完成端口對象一樣

MyOverlapped ol;//使用一個自定義的OVERLAPPED
ol.Offset = 4;

BOOL rt = ReadFile(hFile, ol.GetByte(), 4, NULL, &ol);//發起一個異步IO

MyOverlapped ol2;
ol2.Offset = 5;

rt = ReadFile(hFile, ol2.GetByte(), 5, NULL, &ol2);//再發起一個異步IO

DWORD dwErr = GetLastError();

//一般會創建CPU個數 * 2 個處理線程。
_beginthread(WorkThread, 0, hCP);//啟動一個處理線程。
_beginthread(WorkThread, 0, hCP);//再啟動一個處理線程。

Sleep(2000);

CloseHandle(hFile);
CloseHandle(hCP);
}

/*線程池 + 完成端口
*/

VOID CALLBACK OverlappedCompletionRoutine(PTP_CALLBACK_INSTANCE pInstance,
PVOID pvContext,
PVOID pOverlapped,
ULONG IoResult,
ULONG_PTR NumberOfBytesTransferred,
PTP_IO pIo)
{
printf("OverlappedCompletionRoutine, transferred: %d bytes\n", NumberOfBytesTransferred);
}



void IOCP_ThreadPool()
{
HANDLE hFile = CreateFileW(L"d:\\test.txt", GENERIC_ALL, 0, 0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);//打開一個設備

PTP_IO pio = CreateThreadpoolIo(hFile, OverlappedCompletionRoutine, NULL, NULL);//將設備對象和線程池的IO完成端口關聯起來。
DWORD err = GetLastError();


MyOverlapped ol;//使用一個自定義的OVERLAPPED
ol.Offset = 4;

StartThreadpoolIo(pio);//每次發起一個異步io請求的時候,都要調一下這個函數,不然在CloseThreadpoolIo()的時候會出異常。
BOOL rt = ReadFile(hFile, ol.GetByte(), 4, NULL, &ol);//發起一個異步IO


MyOverlapped ol2;
ol2.Offset = 5;

StartThreadpoolIo(pio);
rt = ReadFile(hFile, ol2.GetByte(), 5, NULL, &ol2);//再發起一個異步IO


Sleep(4000);

CloseHandle(hFile);//關閉文件對象
CloseThreadpoolIo(pio);//關閉線程池io完成對象
}

int _tmain(int argc, _TCHAR* argv[])
{
DeviceObjIO();

EventObjIO();

APCIO();

IOCP();

IOCP_ThreadPool();

return 0;
}





注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com