动静队列是 Linux IPC 中很常用的一种通信体例,今天禀析一下Posix动静队列,本文中所讲的动静队列均为Posix动静队列。

什么是 Posix 动静队

动静队列能够认为它是一个动静链表,有足够写权限的历程能够往队列中发送动静,有足够读权限的历程能够往队列中领受动静。

每个动静都是一个记录,它由发送者付与一个优先级。在某个历程往一个队列写入动静之前,其实不需要别的某个历程在该队列上期待动静的抵达。那也就申明动静队列具有随内核的持续性,也就是说历程封闭后,动静队列仍然存在,除非内核从头自举。

Posix 动静队列有如下特点:

对Posix动静队列的读老是返回优先级更高最早的动静。当往空的动静队列中放置一个动静时,Posix动静队列允许产生一个信号或者启动一个领受线程。

Posix 动静队列中的每条动静凡是具有以部属性:

一个暗示优先级的整数;动静的数据部门的长度;动静数据自己;

动静队列的根本操做

翻开或创建一个 posix 动静队列操做接口

mqd_t mq_oPEn(const char *name, int oflag, mode_t mode, struct mq_attr *attr); Link with -lrt.

参数 name 为 posix IPC 名字, 即将要被翻开或创建的动静队列对象,为了便于移植,需要指定为“/name”的格局。

参数oflag必需要有O_RDONLY(只读)、标记O_RDWR (读写), O_WRONLY(只写)之一,除此之外还能够指定 O_CREAT(没有该对象则创建)、O_EXCL(若是 O_CREAT 指定,但 name 不存在,就返回错误),O_NONBLOCK(以非阻塞体例翻开动静队列,在一般情况下mq_receive和mq_send 函数会阻塞的处所,利用该标记翻开的动静队列会返回 EAGAIN 错误)。

当操做一个新队列时,利用 O_CREAT 标识,此时后面两个参数需要被指定,参数 mode 为指定权限位,attr 指定新创建队列的属性。

封闭历程描述符操做接口

int mq_close(mqd_t mqdes);

封闭之后告诉历程不在利用该描述符,但动静队列不会从系统中删除。

系统中删除某个动静队列操做接口

int mq_unlink(const char *name);

参数为mq_open()函数第一个参数,挪用该接口后删除会马上发作,即便该队列的描述符引用计数仍然大于0。

关于动静队列中设置和和获打消息队列属性接口

mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

每个动静队列有四个属性, mq_getattr返回所有的那些属性, mq_setattr设置此中的某个属性

动静队列的动静详细属性如下

struct mq_attr {long mq_flags; /* Flags: 0 or O_NONBLOCK */long mq_maxmsg; /* Max. # of messages on queue */long mq_msgsize; /* Max. message size (bytes) */long mq_curmsgs; /* # of messages currently in queue */}

指向mq_attr的指针能够做为mq_open函数的第四个参数传递, 从而在创建队列初就设置好每个动静的更大长度和允许存在的更大动静数量, 别的两个成员被忽略。

mq_setattr给所指定队列设置属性, 但是只利用由attr指向的mq_attr构造的mq_flags成员, 以设置或肃清非阻塞标记, 其他三个成员则被忽略(此中两个只能在创建队列时指定, 还有一个及时获取)。当然, mq_setattr的最初一个参数用于领受之前的属性和当前形态

向动静队列放置和取走动静的操做接口

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_PRio); ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);

参数msg_ptr为指向动静的指针。

msg_len为动静长度,该值不克不及大于属性值中mq_msgsize的值。

msg_prio为优先级,动静在队列中将根据优先级大小挨次来摆列动静。

若是动静队列已满,mq_send()函数将阻塞,曲到队列有可用空间再次允许放置动静或该挪用被信号打断;若是O_NONBLOCK被指定,mq_send()那么将不会阻塞,而是返回EAGAIN错误。

若是队列空,mq_receive()函数将阻塞,曲到动静队列中有新的动静;若是O_NONBLOCK被指定,mq_receive()那么将不会阻塞,而是返回EAGAIN错误。

动静队列的原理阐发

动静队列的初始化

static int __init init_mqueue_fs(void){ ... //注册动静队列文件系统 error = register_filesystem(&mqueue_fs_type); //构建struct vfsmount构造次要是获取文件系统的super_block对象与根目次的inode与dentry对象,并将那些对象参加到系统链表 if (IS_ERR(mqueue_mnt = kern_mount(&mqueue_fs_type))) { ... } queues_count = 0; spin_lock_init(&mq_lock); return 0;out_filesystem:out_sysctl: return error;}__initcall(init_mqueue_fs);

动静队列文件系统初始化很简单,次要工做如下:

注册文件系统,把 mqueue_fs_type 参加到 file_systems 链表中。构建struct vfsmount构造,把获取的文件系统的super_block对象与根目次的 inode 与 dentry 对象,并将那些对象参加到系统链表中。

mq_open 接口阐发

asmlinkage long sys_mq_open(const char __user *u_name, int oflag, mode_t mode, struct mq_attr __user *u_attr){ ... //获取一个未利用的文件描述符 fd = get_unused_fd(); mutex_lock(&mqueue_mnt->mnt_root->d_inode->i_mutex); //获取一个名字为name的dentry构造 dentry = lookup_one_len(name, mqueue_mnt->mnt_root, strlen(name)); mntget(mqueue_mnt); //若是新创建 if (oflag & O_CREAT) { if (dentry->d_inode) { /* entry already exists */ audit_inode(name, dentry); error = -EEXIST; if (oflag & O_EXCL) goto out; //若已经存在,则间接翻开file filp = do_open(dentry, oflag); } else { //创建一个file构造,把inode和dentry与之联系关系 filp = do_create(mqueue_mnt->mnt_root, dentry, oflag, mode, u_attr); } } else { //不然,间接获取 error = -ENOENT; if (!dentry->d_inode) goto out; audit_inode(name, dentry); filp = do_open(dentry, oflag); } if (IS_ERR(filp)) { error = PTR_ERR(filp); goto out_putfd; } //给描述符设置close_on_exec标记 set_close_on_exec(fd, 1); //文件描述符与file停止联系关系 fd_install(fd, filp); goto out_upsem; ... return fd;}

mq_open 的操做很简单,操做如下:

获取一个未利用的文件描述符 fd;按照参数name获取一个 dentry;按照 oflag 暗示判断能否是新创建仍是利用已存在的 file,若新创建,则生成一个 file 构造,同时与 fd 、 inode、dentry 停止联系关系;不然翻开一个已存在的file。

mq_send 接口阐发

当利用 mq_send 发送动静时,好比如下挪用,

mq_send(mqd, msg, msg_len, msg_prio)

发送动静时,最末会挪用如下函数

mq_timedsend(mqd, msg, msg_len, msg_prio, NULL)

asmlinkage long sys_mq_timedsend(mqd_t mqdes, const char __user *u_msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec __user *u_abs_timeout){ ...//获取file构造filp = fget(mqdes);if (unlikely(!filp))goto out;inode = filp->f_path.dentry->d_inode;//获取inode 下的 mqueue_inode_infoinfo = MQUEUE_I(inode);//把用户传来的动静转换成内核动静链表,若用户动静长度大于PAGE_SIZE,则链表构造为 msg_msg-> msg_msgseg -> msg_msgseg 每个节点都已一个页大小,除了最初一个节点//若用户动静大小小于PAGE_SIZE,链表只要一个节点 msg_msg // msg_ptr 指向链表头节点 msg_msgmsg_ptr = load_msg(u_msg_ptr, msg_len);//msg_msg 中记录动静的总长度,和 动静优先级msg_ptr->m_ts = msg_len; msg_ptr->m_type = msg_prio;spin_lock(&info->lock);//若动静数量到达更大值if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {//若非阻塞,则返回if (filp->f_flags & O_NONBLOCK) {spin_unlock(&info->lock);ret = -EAGAIN;//若超不时间小于0,则返回超不时间} else if (unlikely(timeout < 0)) {spin_unlock(&info->lock);ret = timeout;} else {//阻塞挪用,则历程休眠,把wait参加到info中的SEND期待队列中wait.task = current;wait.msg = (void *) msg_ptr;wait.state = STATE_NONE;ret = wq_sleep(info, SEND, timeout, &wait);}} else {//若info 领受队列中有阻塞的历程,则把要发送的数据挂到阻塞的历程的动静节点上,然后唤醒阻塞的领受历程receiver = wq_get_first_waiter(info, RECV);if (receiver) {pipelined_send(info, msg_ptr, receiver);} else {//把动静挂到动静队列上,并停止通知msg_insert(msg_ptr, info);__do_notify(info);}inode->i_atime = inode->i_mtime = inode->i_ctime =CURRENT_TIME;spin_unlock(&info->lock);ret = 0;}return ret;}

sys_mq_timedsend 功用如下:

发送动静前先把用户动静转换成动静链表。若动静队列满了,则按照前提停止能否阻塞。动静队列未满,若领受队列中存在阻塞的领受历程,则把要发送的数据挂到阻塞的历程的动静节点上,然后唤醒阻塞的领受历程;不然把动静加到动静队列中。

mq_receive 接口阐发

当利用 mq_receive 领受动静时,好比如下挪用,

领受动静时,最末会挪用如下函数

mq_timedreceive(mqd, msg, msg_len, &msg_prio, NULL)详细实现如下asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr, size_t msg_len, unsigned int __user *u_msg_prio, const struct timespec __user *u_abs_timeout){ ... //获取file构造 filp = fget(mqdes); inode = filp->f_path.dentry->d_inode; //获取inode 下的 mqueue_inode_info info = MQUEUE_I(inode); audit_inode(NULL, filp->f_path.dentry); spin_lock(&info->lock); //动静队列当前没有动静 if (info->attr.mq_curmsgs == 0) { //若非阻塞,则返回 if (filp->f_flags & O_NONBLOCK) { spin_unlock(&info->lock); ret = -EAGAIN; msg_ptr = NULL; //若超不时间小于0,则返回超不时间 } else if (unlikely(timeout < 0)) { } else { //阻塞挪用,则历程休眠,把wait参加到info中的RECV期待队列中 wait.task = current; wait.state = STATE_NONE; ret = wq_sleep(info, RECV, timeout, &wait); msg_ptr = wait.msg; } } else { //从动静队列的更高优先级中获取一个动静 msg_ptr = msg_get(info); inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; //因为从动静队列中已经取出一个动静了,有剩余的空间,因而把期待SEND队列上的历程的动静挂到动静队里中,然后唤醒发送动静的历程 pipelined_receive(info); spin_unlock(&info->lock); ret = 0; } if (ret == 0) { ret = msg_ptr->m_ts; //把动静拷贝到用户态 if ((u_msg_prio && put_user(msg_ptr->m_type, u_msg_prio)) || store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)) { ret = -EFAULT; } //释放动静空间 free_msg(msg_ptr); }out_fput: fput(filp);out: return ret;}

sys_mq_timedreceive 功用如下:

获打消息前,先判断动静队列中能否有动静,若没有,则按照前提停止能否阻塞。若动静队列中有动静,则从动静队列的获取一个更高优先级的动静。唤醒因为动静队列满而阻塞的发送历程。把动静拷贝到用户缓冲区中。

mq_unlink 接口阐发

asmlinkage long sys_mq_unlink(const char __user *u_name){ ... //引用计数减1,删除目次项 err = vfs_unlink(dentry->d_parent->d_inode, dentry); return err;}

该函数感化很简单,就是削减引用计数,删除目次项。

颠末上述的阐发,有关动静队列的内存构造能够总结如下:

历程间通信(IPC) 系列 | Posix 动静队列  第1张