POSIX異步I/O接口使用


POSIX1003.1b 實時擴展協議規定的標准異步 I/O 接口,即 aio_read 函數、 aio_write 函數、aio_fsync 函數、aio_cancel 函數、aio_error 函數、aio_return 函數、aio_suspend函數和 lio_listio 函數。這組 API 用來操作異步 I/O。

異步 I/O 是針對同步 I/O 提出的概念,它不需要線程等待 I/O 結果,而只需要請求進行傳輸,然后系統會自動完成 I/O 傳輸,結束或者出現錯誤時會產生相應的 I/O 信號,用戶程序只需要設置好對應的信號陷入函數,即可處理一個異步 I/O 事件。

        #man aio

        The POSIX AIO interface consists of the following functions:

        aio_read(3)     Enqueue a read request.  This is the asynchronous analog of read(2).

        aio_write(3)    Enqueue a write request.  This is the asynchronous analog of write(2).

        aio_fsync(3)    Enqueue a sync request for the I/O operations on a file descriptor.  This is the asynchronous analog of fsync(2) and fdatasync(2).

        aio_error(3)    Obtain the error status of an enqueued I/O request.

        aio_return(3)   Obtain the return status of a completed I/O request.

        aio_suspend(3)  Suspend the caller until one or more of a specified set of I/O requests completes.

        aio_cancel(3)   Attempt to cancel outstanding I/O requests on a specified file descriptor.

        lio_listio(3)   Enqueue multiple I/O requests using a single function call.

        The aiocb ("asynchronous I/O control block") structure defines parameters that control an I/O operation.  An argument of this type is employed with all  of  the  functions  listed
        above.  This structure has the following form:

           #include <aiocb.h>

           struct aiocb {
               /* The order of these fields is implementation-dependent */

               int             aio_fildes;     /* File descriptor */
               off_t           aio_offset;     /* File offset */
               volatile void  *aio_buf;        /* Location of buffer */
               size_t          aio_nbytes;     /* Length of transfer */
               int             aio_reqprio;    /* Request priority */
               struct sigevent aio_sigevent;   /* Notification method */
               int             aio_lio_opcode; /* Operation to be performed;
                                                  lio_listio() only */

               /* Various implementation-internal fields not shown */
           };

            struct sigevent{
                int             sigev_notify;    // notify type
                int             sigev_signo;     // signal number
                union sigval    sigev_value;     // notify argument
                void (*sigev_notify_function)(union sigval);    // notify function
                pthread_attr_t *sigev_notify_attributes;        // notify attrs
            };

在 aiocb 結構中,aio_fildes 字段表示被打開用來讀或寫的文件描述符。

讀或寫操作從 aio_offset 指定的偏移量開始(注意,異步 I/O 操作必須顯示地指定偏移量。只要不在同一個進程里把異步 I/O 函數和傳統 I/O 函數混在一起用在同一個文件上,異步 I/O 接口是不會影響操作系統維護的文件偏移量的。

另外,如果使用異步 I/O 接口向一個以追加模式打開的文件中寫入數據,aio_offset 字段會被忽略)。

讀寫數據的操作都是在 aio_buf 指定的緩沖區中進行,aio_nbytes 字段則包含了要讀寫的字節數。

aio_reqprio 為異步 I/O 請求提示順序(但系統對該順序的控制力有限,因此不一定遵循)。

LIO_NOP 沒有傳輸請求

LIO_READ 請求一個讀操作

LIO_WRITE 請求一個寫操作

LIO_SYNC 請求異步 I/O 同步

aio_lio_opcode 字段只能用於基於列表的異步 I/O(見下)。aio_sigevent 使用 sigevent 結構來控制在 I/O 事件完成后,如何通知應用程序。
在 sigevent 結構中,sigev_notify 字段控制通知的類型,其取值可能是以下 3 個中的一個。

1)SIGEV_NONE:
      異步 I/O 請求完成后,不通知進程。
2)SIGEV_SIGNAL:
      異步 I/O 請求完成后,產生由 sigev_signo 字段指定的信號。如果應用程序已選擇捕捉信號,且在建立信號處理程序時指定了 SA_SIGINFO 標志,那么該信號將被入隊(如果實現支持排隊信號)。
      信號處理程序會傳送給一個 siginfo 結構,該結構的 si_value 字段被設置為 sigev_value(如果使用了 SA_SIGINFO 標志)。
3)SIGEV_THREAD:
      當異步 I/O 請求完成時,由 sigev_notify_function 指定的函數會被調用,sigev_value 字段被作為它的唯一參數傳入。
      除非 sigev_notify_attributes 字段被設置為 pthread 屬性結構的地址,且該結構指定了另一個線程屬性,否則該函數將在分離狀態下的一個單獨的線程中執行。

 


進行異步 I/O 之前需要先初始化 AIO 控制塊。aio_read 和 aio_write 函數可分別用來進行異步讀和異步寫操作。

 

調用 aio_read 函數和 aio_write 函數成功之后,異步 I/O 請求便已經被操作系統放入等待處理的隊列中了。這些返回值與實際 I/O 操作的結果沒有任何關系,如果需要查看函數的返回狀態可調用 aio_error 函數。
如果想強制所有等待中的異步操作不等待而寫入存儲中,可以建立一個 AIO 控制塊並調用 aio_fsync 函數。

 

aio_fsync 在安排了同步后便返回,在異步同步操作完成前,數據不會被持久化。AIO 控制塊中的 aio_fields 字段指定了其異步寫操作被同步的文件。如果 op 參數被設置為 O_DSYNC,那么操作執行起來就會像調用了 fdatasync 一樣。否則,如果 op 參數被設置為 O_SYNC,則操作執行起來就會像調用了 fsync 一樣。
aio_error 的返回值為下面 4 種情況中的一種。

10:表示異步操作成功完成,需要調用 aio_return 函數來獲取操作返回值。
(2)-1:表示對 aio_error 的調用失敗,這可以通過查看 errno 來獲知失敗原因。
(3)EINPROGRESS:表示異步讀、寫或同步操作仍在等待。
(4)其他情況:其他任何的返回值是相關的異步操作失敗返回的錯誤碼。

 


直到異步操作完成之前,都需要小心不要調用 aio_return 函數,否則結果是未定義的。而且還要小心對每個異步操作只調用一次 aio_return,因為一旦調用了該函數,操作系統就可以釋放掉包含了 I/O 操作返回值的記錄。如果 aio_return 本身失敗,就會返回 -1,並設置 errno。否則其他情況下,它將直接返回 read、write 或 fsync 被成功調用時的結果。
執行 I/O 操作時,如果還有其他事務要處理而不想被 I/O 操作阻塞,就可以使用異步 I/O。但如果在完成了所有事務后還有異步操作未完成時,就可以調用 aio_suspend 函數來阻塞進程,直到操作完成。

RETURN VALUE
       If this function returns after completion of one of the I/O requests specified in aiocb_list, 0 is returned.  Otherwise, -1 is returned, and errno is set to indicate the error.

ERRORS
       EAGAIN The call timed out before any of the indicated operations had completed.

       EINTR  The call was ended by signal (possibly the completion signal of one of the operations we were waiting for); see signal(7).

 




 

 

而當還有不想再完成的等待中的異步 I/O 操作時,可以嘗試用 aio_cancel 函數來取消它們。

aio_cancel 會返回以下情況中的一種:

       AIO_CANCELED 所有請求的操作已被取消
              All requests were successfully canceled.

       AIO_NOTCANCELED 至少有一個請求操作沒被取消
              At least one of the requests specified was not canceled because it was in progress.  In this case, one may check the status of individual requests using aio_error(3).

       AIO_ALLDONE 在請求取消之前已經完成
              All requests had already been completed before the call.

       -1     An error occurred.  The cause of the error can be found by inspecting errno.

 

以下是一個來自 man 手冊的 例程

       #include <fcntl.h>
       #include <stdlib.h>
       #include <unistd.h>
       #include <stdio.h>
       #include <errno.h>
       #include <aio.h>
       #include <signal.h>

       #define BUF_SIZE 20     /* Size of buffers for read operations */

       #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0)

       #define errMsg(msg)  do { perror(msg); } while (0)

       struct ioRequest {      /* Application-defined structure for tracking
                                  I/O requests */
           int           reqNum;
           int           status;
           struct aiocb *aiocbp;
       };

       static volatile sig_atomic_t gotSIGQUIT = 0;
                               /* On delivery of SIGQUIT, we attempt to
                                  cancel all outstanding I/O requests */

       static void             /* Handler for SIGQUIT */
       quitHandler(int sig)
       {
           gotSIGQUIT = 1;
       }

       #define IO_SIGNAL SIGUSR1   /* Signal used to notify I/O completion */

       static void                 /* Handler for I/O completion signal */
       aioSigHandler(int sig, siginfo_t *si, void *ucontext)
       {
           if (si->si_code == SI_ASYNCIO) {
               write(STDOUT_FILENO, "I/O completion signal received\n", 31);

               /* The corresponding ioRequest structure would be available as
                      struct ioRequest *ioReq = si->si_value.sival_ptr;
                  and the file descriptor would then be available via
                      ioReq->aiocbp->aio_fildes */
           }
       }

       int
       main(int argc, char *argv[])
       {
           struct ioRequest *ioList;
           struct aiocb *aiocbList;
           struct sigaction sa;
           int s, j;
           int numReqs;        /* Total number of queued I/O requests */
           int openReqs;       /* Number of I/O requests still in progress */

           if (argc < 2) {
               fprintf(stderr, "Usage: %s <pathname> <pathname>...\n",
                       argv[0]);
               exit(EXIT_FAILURE);
           }

           numReqs = argc - 1;

           /* Allocate our arrays */

           ioList = calloc(numReqs, sizeof(struct ioRequest));
           if (ioList == NULL)
               errExit("calloc");

           aiocbList = calloc(numReqs, sizeof(struct aiocb));
           if (aiocbList == NULL)
               errExit("calloc");

           /* Establish handlers for SIGQUIT and the I/O completion signal */

           sa.sa_flags = SA_RESTART;
           sigemptyset(&sa.sa_mask);

           sa.sa_handler = quitHandler;
           if (sigaction(SIGQUIT, &sa, NULL) == -1)
               errExit("sigaction");

           sa.sa_flags = SA_RESTART | SA_SIGINFO;
           sa.sa_sigaction = aioSigHandler;
           if (sigaction(IO_SIGNAL, &sa, NULL) == -1)
               errExit("sigaction");

           /* Open each file specified on the command line, and queue
              a read request on the resulting file descriptor */

           for (j = 0; j < numReqs; j++) {
               ioList[j].reqNum = j;
               ioList[j].status = EINPROGRESS;
               ioList[j].aiocbp = &aiocbList[j];

               ioList[j].aiocbp->aio_fildes = open(argv[j + 1], O_RDONLY);
               if (ioList[j].aiocbp->aio_fildes == -1)
                   errExit("open");
               printf("opened %s on descriptor %d\n", argv[j + 1],
                       ioList[j].aiocbp->aio_fildes);

               ioList[j].aiocbp->aio_buf = malloc(BUF_SIZE);
               if (ioList[j].aiocbp->aio_buf == NULL)
                   errExit("malloc");

               ioList[j].aiocbp->aio_nbytes = BUF_SIZE;
               ioList[j].aiocbp->aio_reqprio = 0;
               ioList[j].aiocbp->aio_offset = 0;
               ioList[j].aiocbp->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
               ioList[j].aiocbp->aio_sigevent.sigev_signo = IO_SIGNAL;
               ioList[j].aiocbp->aio_sigevent.sigev_value.sival_ptr =
                                       &ioList[j];

               s = aio_read(ioList[j].aiocbp);
               if (s == -1)
                   errExit("aio_read");
           }

           openReqs = numReqs;

           /* Loop, monitoring status of I/O requests */

           while (openReqs > 0) {
               sleep(3);       /* Delay between each monitoring step */

               if (gotSIGQUIT) {

                   /* On receipt of SIGQUIT, attempt to cancel each of the
                      outstanding I/O requests, and display status returned
                      from the cancellation requests */

                   printf("got SIGQUIT; canceling I/O requests: \n");

                   for (j = 0; j < numReqs; j++) {
                       if (ioList[j].status == EINPROGRESS) {
                           printf("    Request %d on descriptor %d:", j,
                                   ioList[j].aiocbp->aio_fildes);
                           s = aio_cancel(ioList[j].aiocbp->aio_fildes,
                                   ioList[j].aiocbp);
                           if (s == AIO_CANCELED)
                               printf("I/O canceled\n");
                           else if (s == AIO_NOTCANCELED)
                               printf("I/O not canceled\n");
                           else if (s == AIO_ALLDONE)
                               printf("I/O all done\n");
                           else
                               errMsg("aio_cancel");
                       }
                   }

                   gotSIGQUIT = 0;
               }

               /* Check the status of each I/O request that is still
                  in progress */

               printf("aio_error():\n");
               for (j = 0; j < numReqs; j++) {
                   if (ioList[j].status == EINPROGRESS) {
                       printf("    for request %d (descriptor %d): ",
                               j, ioList[j].aiocbp->aio_fildes);
                       ioList[j].status = aio_error(ioList[j].aiocbp);

                       switch (ioList[j].status) {
                       case 0:
                           printf("I/O succeeded\n");
                           break;
                       case EINPROGRESS:
                           printf("In progress\n");
                           break;
                       case ECANCELED:
                           printf("Canceled\n");
                           break;
                       default:
                           errMsg("aio_error");
                           break;
                       }

                       if (ioList[j].status != EINPROGRESS)
                           openReqs--;
                   }
               }
           }

           printf("All I/O requests completed\n");

           /* Check status return of all I/O requests */

           printf("aio_return():\n");
           for (j = 0; j < numReqs; j++) {
               ssize_t s;

               s = aio_return(ioList[j].aiocbp);
               printf("    for request %d (descriptor %d): %zd\n",
                       j, ioList[j].aiocbp->aio_fildes, s);
           }

           exit(EXIT_SUCCESS);
       }

 



 


 

ref:https://blog.csdn.net/aisxyz/article/details/84915025


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com