linux進程間通訊-消息隊列


1、什么是消息隊列

 1.1 基本概念

  消息隊列提供了一種從一個進程向另一個進程發送一個數據塊的方法。每個數據塊都含有一個類型,接收進程可以選擇地接收含有不同類型的數據結構。消息隊列沒有命名管道的同步和阻塞問題,但是消息隊列與命名管道一樣,每個數據塊都有一個最大長度的限制。Linux用宏MSGMAX和MSGMNB來限制一條消息的最大長度和一個隊列的最大長度。
  每個消息隊列都有一個隊列頭。在內核中用結構struct msg_queue來描述,隊列頭中包含了該消息隊列的大量信息,包括消息隊列鍵值、用戶ID、組ID、消息隊列中消息數目等等,甚至記錄了最近對消息隊列讀寫進程的ID。讀者可以訪問這些信息,也可以設置其中的某些信息。在用戶空間,消息隊列頭用結構struct msqid_ds來描述,所以在用戶空間我們用結構msqid_ds用來設置或返回消息隊列的信息,其實它是把內核中的數據結構struct msg_queue的內容讀到用戶空間的數據結構struct msqid_ds中而已。
  下圖說明了內核與消息隊列是怎樣建立起聯系的,其中,struct ipc_ids msg_ids是內核中記錄消息隊列的全局數據結構。
這里寫圖片描述

 1.2 數據結構

用戶空間數據結構
a、struct msqid_ds結構體

    struct msqid_ds {
  struct ipc_perm msg_perm;
  struct msg *msg_first; /* first message on queue,unused */
  struct msg *msg_last; /* last message in queue,unused */
  __kernel_time_t msg_stime; /* last msgsnd time */
  __kernel_time_t msg_rtime; /* last msgrcv time */
  __kernel_time_t msg_ctime; /* last change time */
  unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
  unsigned long msg_lqbytes; /* ditto */
  unsigned short msg_cbytes; /* current number of bytes on queue */
  unsigned short msg_qnum; /* number of messages in queue */
  unsigned short msg_qbytes; /* max number of bytes on queue */
   __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
   __kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};

  msgque[MSGMNI]是一個msqid_ds結構的指針數組,每個msqid_ds結構指針代表一個系統消息隊列,msgque[MSGMNI]的大小為MSGMNI=128,也就是說系統最多有MSGMNI=128個消息隊列。一個struct msqid_ds代表一個消息隊列。

b、struct msgbuf結構體

     struct msgbuf {
  long mtype;
  char mtext[1];
};

  消息隊列最大的靈活性在於,我們可以自己定義傳遞給隊列的消息的數據類型的。不過這個類型並不是隨便定義的,msgbuf結構給了我們一個這類數據類型的基本結構定義。

內核空間數據結構
(貼出來只是為了更好的理解消息隊列,用戶使用時可以無視)
a、struct msg_queue結構體

        /* one msq_queue structure for each present queue on the system */
struct msg_queue {
  struct kern_ipc_perm q_perm;
   time_t q_stime; /* last msgsnd time */
   time_t q_rtime; /* last msgrcv time */
   time_t q_ctime; /* last change time */
  unsigned long q_cbytes; /* current number of bytes on queue */
  unsigned long q_qnum; /* number of messages in queue */
  unsigned long q_qbytes; /* max number of bytes on queue */
  pid_t q_lspid; /* pid of last msgsnd */
  pid_t q_lrpid; /* last receive pid */
  struct list_head q_messages;
  struct list_head q_receivers;
  struct list_head q_senders;
};

  當發送一個消息到該消息隊列時,把發送的消息構造成一個msg結構對象,並添加到q_messages隊列中,接收消息的時候也是從q_messages隊列尾部查找到一個msg_type匹配的msg節點,從鏈表隊列中刪除該msg節點。
  
b、struct msg_msg結構體

     /* one msg_msg structure for each message */
struct msg_msg {
  struct list_head m_list;
  long m_type;
  size_t m_ts; /* message text size */
  struct msg_msgseg *next;
  void *security;
  /* the actual message follows immediately */
};

  消息隊列在系統內核中是以消息鏈表的形式出現的,而完成消息鏈表每個節點結構定義的就是msg_msg結構。

2、消息隊列的system_v調用

  1. msgget函數
    該函數用來創建和訪問一個消息隊列。它的原型為:
    int msgget(key_t, key, int msgflg);
      與其他的IPC機制一樣,程序必須提供一個鍵來命名某個特定的消息隊列。
      msgflg是一個權限標志,表示消息隊列的訪問權限,它與文件的訪問權限一樣。msgflg可以與IPC_CREAT做或操作,表示當key所命名的消息隊列不存在時創建一個消息隊列,如果key所命名的消息隊列存在時,IPC_CREAT標志會被忽略,而只返回一個標識符。它返回一個以key命名的消息隊列的標識符(非零整數),失敗時返回-1

  2. msgsnd函數
    該函數用來把消息添加到消息隊列中。它的原型為:
    int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);
      msgid是由msgget函數返回的消息隊列標識符。
      msg_ptr是一個指向准備發送消息的指針,但是消息的數據結構卻有一定的要求,指針msg_ptr所指向的消息結構一定要是以一個長整型成員變量開始的結構體,接收函數將用這個成員來確定消息的類型。
      msg_sz是msg_ptr指向的消息的長度,注意是消息的長度,而不是整個結構體的長度,也就是說msg_sz是不包括長整型消息類型成員變量的長度。
      如果調用成功,消息數據的一分副本將被放到消息隊列中,並返回0,失敗時返回-1.

  3. msgrcv函數
    該函數用來從一個消息隊列獲取消息,它的原型為:
    int msgrcv(int msgid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
      msgtype可以實現一種簡單的接收優先級。如果msgtype為0,就獲取隊列中的第一個消息。如果它的值大於零,將獲取具有相同消息類型的第一個信息。如果它小於零,就獲取類型等於或小於msgtype的絕對值的第一個消息。
      msgflg用於控制當隊列中沒有相應類型的消息可以接收時將發生的事情。
      調用成功時,該函數返回放到接收緩存區中的字節數,消息被復制到由msg_ptr指向的用戶分配的緩存區中,然后刪除消息隊列中的對應消息。失敗時返回-1.
  4. msgctl函數
    該函數用來控制消息隊列,它與共享內存的shmctl函數相似,它的原型為:
    int msgctl(int msgid, int command, struct msgid_ds *buf);
    成功時返回0,失敗時返回-1。command是將要采取的動作,它可以取3個值,
    IPC_STAT:把msgid_ds結構中的數據設置為消息隊列的當前關聯值,即用消息隊列的當前關聯值覆蓋msgid_ds的值。
    IPC_SET:如果進程有足夠的權限,就把消息列隊的當前關聯值設置為msgid_ds結構中給出的值
    IPC_RMID:刪除消息隊列
      

3、消息隊列與命名管道相比

1、消息隊列也可以獨立於發送和接收進程而存在,從而消除了在同步命名管道的打開和關閉時可能產生的困難。
2、同時通過發送消息還可以避免命名管道的同步和阻塞問題,不需要由進程自己來提供同步方法。
3、接收程序可以通過消息類型有選擇地接收數據,而不是像命名管道中那樣,只能默認地接收。

4、使用例子

  由於消息隊列可以讓不相關的進程進行行通信,所以我們在這里將會編寫兩個程序,msgreceive和msgsned來表示接收和發送信息。根據正常的情況,我們允許兩個程序都可以創建消息,但只有接收者在接收完最后一個消息之后,它才把它刪除。
  msgreceive.c文件

#include <unistd.h> 
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/msg.h>

struct msg_st
{
long int msg_type;
char text[BUFSIZ];
};

int main()
{
int running = 1;
int msgid = -1;
struct msg_st data;
long int msgtype = 0; //注意1

//建立消息隊列
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if(msgid == -1)
{
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}
//從隊列中獲取消息,直到遇到end消息為止
while(running)
{
if(msgrcv(msgid, (void*)&data, BUFSIZ, msgtype, 0) == -1)
{
fprintf(stderr, "msgrcv failed with errno: %d\n", errno);
exit(EXIT_FAILURE);
}
printf("You wrote: %s\n",data.text);
//遇到end結束
if(strncmp(data.text, "end", 3) == 0)
running = 0;
}
//刪除消息隊列
if(msgctl(msgid, IPC_RMID, 0) == -1)
{
fprintf(stderr, "msgctl(IPC_RMID) failed\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}

  msgsend.c文件

    #include <unistd.h> 
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/msg.h>
#include <errno.h>

#define MAX_TEXT 512
struct msg_st
{
long int msg_type;
char text[MAX_TEXT];
};

int main()
{
int running = 1;
struct msg_st data;
char buffer[BUFSIZ];
int msgid = -1;

//建立消息隊列
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if(msgid == -1)
{
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}

//向消息隊列中寫消息,直到寫入end
while(running)
{
//輸入數據
printf("Enter some text: ");
fgets(buffer, BUFSIZ, stdin);
data.msg_type = 1; //注意2
strcpy(data.text, buffer);
//向隊列發送數據
if(msgsnd(msgid, (void*)&data, MAX_TEXT, 0) == -1)
{
fprintf(stderr, "msgsnd failed\n");
exit(EXIT_FAILURE);
}
//輸入end結束輸入
if(strncmp(buffer, "end", 3) == 0)
running = 0;
sleep(1);
}
exit(EXIT_SUCCESS);
}

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com