Linux异步IO+实例(POSIX IO与 libaio)


异步IO基本API

API函数 说明
aio_read 异步读操作
aio_write 异步写操作
aio_error 检查异步请求的状态
aio_return 获得异步请求完成时的返回值
aio_suspend 挂起调用进程,直到一个或多个异步请求已完成
aio_cancel 取消异步请求
lio_list 发起一系列异步I/O请求

上述API调用都会用到 struct aiocb 结构体:

struct aiocb {
int aio_fildes; //文件描述符
off_t aio_offset; //文件偏移量
volatile void *aio_buf; //缓冲区
size_t aio_nbytes; //数据长度
int aio_reqprio; //请求优先级
struct sigevent aio_sigevent; //通知方式
int aio_lio_opcode; //要执行的操作
};

编译时加参数 -lrt


aio_error

检查异步请求状态

int aio_error(const struct aiocb *aiocbp);
返回值 含义
EINPROGRESS 该请求尚未完成
ECANCELED 该请求被撤销
0 请求成功完成
出错 错误信息存入errno中

aio_read 异步读请求

int aio_read(struct aiocb *aiocbp);

例子:读取test.txt 文件

#include<stdio.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<errno.h>
#include<fcntl.h>
#include<aio.h>
#include<stdlib.h>
#include<strings.h>

#define BUFSIZE 256

int main()
{
struct aiocb cbp;
int fd,ret;
int i = 0;

fd = open("test.txt",O_RDONLY);

if(fd < 0)
{
perror("open error\n");
}

//填充struct aiocb 结构体
bzero(&cbp,sizeof(cbp));
//指定缓冲区
cbp.aio_buf = (volatile void*)malloc(BUFSIZE+1);
//请求读取的字节数
cbp.aio_nbytes = BUFSIZE;
//文件偏移
cbp.aio_offset = 0;
//读取的文件描述符
cbp.aio_fildes = fd;
//发起读请求
ret = aio_read(&cbp);
if(ret < 0)
{
perror("aio_read error\n");
exit(1);
}

//查看异步读取的状态,直到读取请求完成
for(i = 1;aio_error(&cbp) == EINPROGRESS;i++)
{
printf("No.%3d\n",i);
}
//读取返回值
ret = aio_return(&cbp);
printf("return %d\n",ret);

// sleep(1);
printf("%s\n",(char*)cbp.aio_buf);
close(fd);
return 0;
}

这里写图片描述

从上图看出,循环检查了3次异步读写的状态,指定的256字节才读取完毕,最后返回读取的字节数256。

如果注释掉异步读的状态检查:

    ...

//查看异步读取的状态,直到读取请求完成
/* for(i = 1;aio_error(&cbp) == EINPROGRESS;i++)
{
printf("No.%3d\n",i);
}
ret = aio_return(&cbp);
printf("return %d\n",ret);
*/
...

这里写图片描述

发现什么都没输出,这是因为程序结束的时候,异步读请求还没完成,所以buf缓冲区还没有读进去数据。

如果将上面代码中的 sleep 的注释去掉,让异步请求发起后,程序等待1秒后再输出,就会发现成功读取到了数据。

用GDB单步跟踪上面程序,当发起异步读请求时:

这里写图片描述

看到发起一个异步请求时,Linux实际上是创建了一个线程去处理,当请求完成后结束线程。


aio_write 异步写

int aio_write(struct aiocb *aiocbp);

例子:

#include<stdio.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<errno.h>
#include<fcntl.h>
#include<aio.h>
#include<stdlib.h>
#include<strings.h>
#include<string.h>

int main()
{
struct aiocb cbp;
int fd,ret;
int i = 0;

//以追加的方式打开
fd = open("test.txt",O_WRONLY|O_APPEND);

if(fd < 0)
{
perror("open error\n");
}

//填充struct aiocb 结构体
bzero(&cbp,sizeof(cbp));
//指定缓冲区,将其写入文件
cbp.aio_buf = "hello,world";
//请求写入的字节数
cbp.aio_nbytes = strlen((const char*)cbp.aio_buf);
//文件偏移
cbp.aio_offset = 0;
//文件描述符
cbp.aio_fildes = fd;
//发起写请求
ret = aio_write(&cbp);
if(ret < 0)
{
perror("aio_read error\n");
exit(1);
}

//查看异步写的状态,直到写请求完成
for(i = 1;aio_error(&cbp) == EINPROGRESS;i++)
{
printf("No.%3d\n",i);
}
ret = aio_return(&cbp);
printf("return %d\n",ret);
close(fd);
return 0;
}

上面test.txt文件是以追加的方式打开的,所以不需要设置aio_offset 文件偏移量,不管文件偏移的值是多少都会在文件末尾写入。

可以去掉O_APPEND, 不以追加方式打开文件,这样就可以设置 aio_offset ,指定从何处位置开始写入文件。


aio_suspend 异步阻塞IO

int aio_suspend(const struct aiocb * const aiocb_list[],
int nitems, const struct timespec *timeout);

前面2个例子发起IO请求都是非阻塞的,即,即使IO请求未完成,也不影响调用程序继续执行后面的语句。
我们可以调用aio_suspend 来阻塞一个或多个异步IO, 只需要将IO请求加入阻塞列表。

例子:

#include<stdio.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<errno.h>
#include<fcntl.h>
#include<aio.h>
#include<stdlib.h>
#include<strings.h>

#define BUFSIZE 1024
#define MAX 2

//异步读请求
int aio_read_file(struct aiocb *cbp,int fd,int size)
{
int ret;
bzero(cbp,sizeof(struct aiocb));

cbp->aio_buf = (volatile void*)malloc(size+1);
cbp->aio_nbytes = size;
cbp->aio_offset = 0;
cbp->aio_fildes = fd;

ret = aio_read(cbp);
if(ret < 0)
{
perror("aio_read error\n");
exit(1);
}
}

int main()
{
struct aiocb cbp1,cbp2;
int fd1,fd2,ret;
int i = 0;
//异步阻塞列表
struct aiocb* aiocb_list[2];

fd1 = open("test.txt",O_RDONLY);
if(fd1 < 0)
{
perror("open error\n");
}
aio_read_file(&cbp1,fd1,BUFSIZE);

fd2 = open("test.txt",O_RDONLY);
if(fd2 < 0)
{
perror("open error\n");
}
aio_read_file(&cbp2,fd2,BUFSIZE*4);

//向列表加入两个请求
aiocb_list[0] = &cbp1;
aiocb_list[1] = &cbp2;
//阻塞,直到请求完成才会继续执行后面的语句
aio_suspend((const struct aiocb* const*)aiocb_list,MAX,NULL);
printf("read1:%s\n",(char*)cbp1.aio_buf);
printf("read2:%s\n",(char*)cbp2.aio_buf);

close(fd1);
close(fd2);
return 0;
}

lio_listio 同时发起多个异步IO请求

int lio_listio(int mode, struct aiocb *const aiocb_list[],
int nitems, struct sigevent *sevp);

这个API比较方便,他可以同时发起多个异步IO请求,并且同时可以指定以阻塞还是非阻塞方式。
第一个参数:LIO_WAIT (阻塞) 或 LIO_NOWAIT(非阻塞)
第二个参数:异步IO请求列表

例子:


#include<stdio.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<errno.h>
#include<fcntl.h>
#include<aio.h>
#include<stdlib.h>
#include<strings.h>

#define BUFSIZE 100
#define MAX 2

//异步读结构体
int aio_read_file(struct aiocb *cbp,int fd,int size)
{
int ret;
bzero(cbp,sizeof(struct aiocb));

cbp->aio_buf = (volatile void*)malloc(size+1);
cbp->aio_nbytes = size;
cbp->aio_offset = 0;
cbp->aio_fildes = fd;
cbp->aio_lio_opcode = LIO_READ;
}

int main()
{
struct aiocb cbp1,cbp2;
int fd1,fd2,ret;
int i = 0;
//异步请求列表
struct aiocb* io_list[2];

fd1 = open("test.txt",O_RDONLY);
if(fd1 < 0)
{
perror("open error\n");
}
aio_read_file(&cbp1,fd1,BUFSIZE);

fd2 = open("test.txt",O_RDONLY);
if(fd2 < 0)
{
perror("open error\n");
}
aio_read_file(&cbp2,fd2,BUFSIZE*4);

io_list[0] = &cbp1;
io_list[1] = &cbp2;

lio_listio(LIO_WAIT,(struct aiocb* const*)io_list,MAX,NULL);
printf("read1:%s\n",(char*)cbp1.aio_buf);
printf("read2:%s\n",(char*)cbp2.aio_buf);

close(fd1);
close(fd2);
return 0;
}

异步IO通知机制


异步与同步的区别就是我们不需要等待异步操作返回就可以继续干其他的事情,当异步操作完成时可以通知我们去处理它。

以下两种方式可以处理异步通知:

  • 信号处理
  • 线程回调

信号处理

在发起异步请求时,可以指定当异步操作完成时给调用进程发送什么信号,这样调用收到此信号就会执行相应的信号处理函数。

例子:

#include<stdio.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<errno.h>
#include<fcntl.h>
#include<aio.h>
#include<stdlib.h>
#include<strings.h>
#include<signal.h>

#define BUFSIZE 256

//信号处理函数,参数signo接收的对应的信号值
void aio_handler(int signo)
{
int ret;
printf("异步操作完成,收到通知\n");
}

int main()
{
struct aiocb cbp;
int fd,ret;
int i = 0;

fd = open("test.txt",O_RDONLY);

if(fd < 0)
{
perror("open error\n");
}

//填充struct aiocb 结构体
bzero(&cbp,sizeof(cbp));
//指定缓冲区
cbp.aio_buf = (volatile void*)malloc(BUFSIZE+1);
//请求读取的字节数
cbp.aio_nbytes = BUFSIZE;
//文件偏移
cbp.aio_offset = 0;
//读取的文件描述符
cbp.aio_fildes = fd;
//发起读请求

//设置异步通知方式
//用信号通知
cbp.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
//发送异步信号
cbp.aio_sigevent.sigev_signo = SIGIO;
//传入aiocb 结构体
cbp.aio_sigevent.sigev_value.sival_ptr = &cbp;

//安装信号
signal(SIGIO,aio_handler);
//发起异步读请求
ret = aio_read(&cbp);
if(ret < 0)
{
perror("aio_read error\n");
exit(1);
}
//暂停4秒,保证异步请求完成
sleep(4);
close(fd);
return 0;
}

这里写图片描述

SIGIO 是系统专门用来表示异步通知的信号,当然也可以发送其他的信号,比如将上面的SIGIO替换为 SIGRTMIN+1 也是可以的。

安装信号可以使用signal(),但是signal不可以传递参数进去。所以推荐使用sigaction()函数,可以为信号处理设置更多的东西。

线程回调

顾名思义就是当调用进程收到异步操作完成的通知时,开线程执行设定好的回调函数去处理。无需专门安装信号。

例子:

#include<stdio.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<errno.h>
#include<fcntl.h>
#include<aio.h>
#include<stdlib.h>
#include<strings.h>
#include<signal.h>

#define BUFSIZE 256

//回调函数
void aio_handler(sigval_t sigval)
{
struct aiocb *cbp;
int ret;

printf("异步操作完成,收到通知\n");
//获取aiocb 结构体的信息
cbp = (struct aiocb*)sigval.sival_ptr;

if(aio_error(cbp) == 0)
{
ret = aio_return(cbp);
printf("读请求返回值:%d\n",ret);
}
while(1)
{
printf("正在执行回调函数。。。\n");
sleep(1);
}
}

int main()
{
struct aiocb cbp;
int fd,ret;
int i = 0;

fd = open("test.txt",O_RDONLY);

if(fd < 0)
{
perror("open error\n");
}

//填充struct aiocb 结构体
bzero(&cbp,sizeof(cbp));
//指定缓冲区
cbp.aio_buf = (volatile void*)malloc(BUFSIZE+1);
//请求读取的字节数
cbp.aio_nbytes = BUFSIZE;
//文件偏移
cbp.aio_offset = 0;
//读取的文件描述符
cbp.aio_fildes = fd;
//发起读请求

//设置异步通知方式
//用线程回调
cbp.aio_sigevent.sigev_notify = SIGEV_THREAD;
//设置回调函数
cbp.aio_sigevent.sigev_notify_function = aio_handler;
//传入aiocb 结构体
cbp.aio_sigevent.sigev_value.sival_ptr = &cbp;
//设置属性为默认
cbp.aio_sigevent.sigev_notify_attributes = NULL;

//发起异步读请求
ret = aio_read(&cbp);
if(ret < 0)
{
perror("aio_read error\n");
exit(1);
}
//调用进程继续执行
while(1)
{
printf("主线程继续执行。。。\n");
sleep(1);
}
close(fd);
return 0;
}

这里写图片描述

与信号处理方式相比,如上图,采用线程回调的方式可以使得在处理异步通知的时候不会阻塞当前调用进程。

libaio

上面介绍的aio其实是用户层使用线程模拟的异步io,缺点是占用线程资源而且受可用线程的数量限制。Linux2.6版本后有了libaio,这完全是内核级别的异步IO,IO请求完全由底层自由调度(以最佳次序的磁盘调度方式)。

libaio的缺点是,想要使用该种方式的文件必须支持以O_DIRECT标志打开,然而并不是所有的文件系统都支持。如果你没有使用O_DIRECT打开文件,它可能仍然“工作”,但它可能不是异步完成的,而是变为了阻塞的。

libaio的介绍可参阅:https://janzhou.org/2011/10/libaio-example/

下面是例子:


#include<stdio.h>
#include<fcntl.h>
#include<string.h>
#include<stdlib.h>
#include<libaio.h>
#include<errno.h>
#include<unistd.h>
#include<unistd.h>

#define MAX_COUNT 1024
#define BUF_SIZE 1 * 1024 *1024

#ifndef O_DIRECT
#define O_DIRECT 040000 /* direct disk access hint */
#endif

int main(int args, void *argv[]){
int fd;

void * buf = NULL;

//获取页面大小
int pagesize = sysconf(_SC_PAGESIZE);
//处理对齐
posix_memalign(&buf, pagesize, BUF_SIZE);

memset(buf,'A',BUF_SIZE);

io_context_t ctx;
struct iocb io,*p=&io;
struct io_event e[10];
struct timespec timeout;

memset(&ctx,0,sizeof(ctx));
//创建并初始化context
if(io_setup(MAX_COUNT,&ctx)!=0){
printf("io_setup error\n");
return -1;
}

if((fd = open("./test.txt", O_WRONLY | O_CREAT | O_APPEND | O_DIRECT, 0644))<0) {
perror("open error");
io_destroy(ctx);
return -1;
}

int n = MAX_COUNT;

while(n > 0) {
//写数据
io_prep_pwrite(&io, fd, buf, BUF_SIZE, 0);
//提交请求
if(io_submit(ctx, 1, &p)!=1) {
io_destroy(ctx);
printf("io_submit error\n");
return -1;
}
//获取完成事件
int ret = io_getevents(ctx, 1, 10, e, NULL);
if (ret != 1) {
perror("ret != 1");
break;
}
n--;
}

close(fd);
//销毁context
io_destroy(ctx);
return 0;
}

如果不能正常编译,执行下面命令:

 sudo apt-get install libaio-dev

使用 dstat 测试磁盘读写速率,libaio与普通read/write相比,libaio读写速率稳定,并且速率略高于普通IO。而且read/write读写数据量大时很占用CPU资源。

参阅:
http://blog.csdn.net/shreck66/article/details/48765533
深入浅出异步IO模型

智能推荐

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告