【多線程】C++自實現適用於多線程的原子消息隊列


這些是3年前都自實現的,現在記錄起來,以防忘記丟失,以便后續查閱使用。

代碼:

     xinclude.h文件:

#ifndef _XINCLUDE_H_
#define _XINCLUDE_H_

//include
#include <stdio.h>
#include <stdlib.h>  
#include <unistd.h>
#include <netinet/in.h>  
#include <sys/socket.h>  
#include <sys/types.h> 
#include <arpa/inet.h>
#include <pthread.h>
#include <errno.h> //errno
#include <time.h>

//include linux c
#include <signal.h> //SIG_ERR signal

//include c includer
#include <string.h> //strerror

//include c++ includer
#include <string>	//std::string
#include <sstream>	//std::stringstream
#include <iostream>
#include <set>
#include <list>
#include <map>
#include <queue>
#include <vector>	//std::vector

//using namespace
using namespace std;

//define
#define MAX_ROUND_VALUE 65530
#define MAX_RCV_SIZE 2000
#define MAX_SND_SIZE 2000

//message struct
typedef struct message_st
{
	void* 	m_pcBuf;//message buf
	int	m_iLen;  	//message length
	int m_iSize; 	//buf capacity size
        	
	message_st()
	{
		m_pcBuf = NULL;
		m_iLen	= 0;
		m_iSize = 0;
	}

	//message is valid
	inline bool IsValid()
	{
	    return (m_pcBuf != NULL) && (m_iLen > 0);
	}

	//allocate memory for message
	inline bool Malloc(int iSize)
	{
	    bool bRet = false;
		if (true == IsValid())
		{
			std::cerr<<"Warning:do't repeat to malloc message buf, because that this is a valid message"<<endl;
			return bRet;
		} 
	    if(NULL != m_pcBuf)	
		{
		    free(m_pcBuf);
			m_pcBuf = NULL;
		}
		m_iLen = 0;
 	    if (iSize == 0)
		{
			m_iSize = 0;
		}
		else if (iSize > 0)
		{
			m_pcBuf = (void*)malloc(iSize + 1);
			if (m_pcBuf == NULL)
			{
				fprintf(stderr, "Failed to %s\r\n", __FUNCTION__);
				return bRet;
			}
			memset(m_pcBuf, 0, iSize + 1);
			m_iSize = iSize + 1;
			bRet = true;
		}

		return bRet;
	}
	
	inline bool SetMsg(void* pVoid, int iMsgLen)
	{
		bool bRet = false;

		if (pVoid == NULL || NULL == m_pcBuf || iMsgLen > m_iSize)
		{
			fprintf(stderr, "Failed to %s\r\n", __FUNCTION__);
		}	
		else
		{
			memcpy (m_pcBuf, pVoid, iMsgLen);
			m_iLen = iMsgLen;
			bRet = true;
		}
		
		return bRet;
	}

	inline void Free()
	{
		if (NULL == m_pcBuf)
		{
			fprintf(stderr, "%s:%d:%s, Warning:don't repeat to free message buf.\r\n", __FILE__,__LINE__,__FUNCTION__);
		}
		else
		{
			free(m_pcBuf);
			m_pcBuf = NULL;
		}
		m_iLen = 0;
		m_iSize = 0;

		return;
	}

}message_st;

#endif
    xautomicQueue.h

#ifndef __ATOMICQUEUE_H_
#define __ATOMICQUEUE_H_

#include "./xinclude.h"

#if defined(__linux__)
namespace xman  
{
	class cxatomicQueue
	{
		private:
			class CGuard //pthread_mutex_t鎖的守護者類CGuard	
			{
				public:
					CGuard(pthread_mutex_t & mutex):m_mutex(mutex)
				{
					pthread_mutex_lock(&m_mutex);
				}
					~CGuard()
					{
						pthread_mutex_unlock(&m_mutex);
					}

				private:
					pthread_mutex_t	m_mutex;
			};

		private:
			cxatomicQueue();//禁止默認構造函數
			cxatomicQueue(cxatomicQueue const&);//禁止拷貝構造
			cxatomicQueue& operator=(cxatomicQueue const&);//禁止賦值構造

		public:
			/// 需要顯示調用此構造函數,阻止使用編譯器提供的默認構造函數
			explicit cxatomicQueue(long n) 	
			{
				//resize message list
				m_stMsg_list.resize(n);

				pthread_mutex_init(&m_mutex, 0);
			}

			~cxatomicQueue()
			{
				pthread_mutex_destroy(&m_mutex);
			}

			//PushBack + PopFront
			void PushBack(struct message_st stMsg);			
			int PopFront(struct message_st &stMsg);

		private:
			void resize();	

		private:
			//如果沒有mutable,operator long() const編譯會失敗
			//const成員函數operator long()里面是不能修改變量的值的
			//其第一行CGuard guard(m_mutex);的入參要求為非const類型
			//如果把m_mutex聲明為mutable類型,可以擺脫const成員函數的
			//不能修改變量的特性,
			//這么一來m_mutex通過mutable的修飾擺脫const成員函數的限制可以作為非const變量作為guard()的入參
			mutable pthread_mutex_t m_mutex;

			std::list<struct message_st>  m_stMsg_list;

	};//class cxatomicQueue

}//namespace xman
#endif //#if defined(__linux__)

#endif //ifndef _XATOMICQUEUE_H_
xatomicQueue.cpp文件:

#include "../Include/xatomicQueue.h"

namespace xman
{
	void cxatomicQueue::PushBack(struct message_st stMsg)
	{
		if (false == stMsg.IsValid())
		{
			return;
		}
		else
		{
			CGuard guard(m_mutex);
			this->resize();
			struct message_st msg;
			if (true == msg.Malloc(stMsg.m_iLen))
			{
				msg.SetMsg(stMsg.m_pcBuf, stMsg.m_iLen);
				m_stMsg_list.push_back(msg);
			}
		}

		return;
	}

	int cxatomicQueue::PopFront(struct message_st &outMsg)
	{
		CGuard guard(m_mutex);

		int iRet = -1;
		if(true == m_stMsg_list.empty())
		{
			return iRet;
		}

		struct message_st stMsg = m_stMsg_list.front(); 
		if(true == stMsg.IsValid())
		{
			if(outMsg.m_pcBuf != NULL && outMsg.m_iSize > stMsg.m_iLen)
			{
				memset(outMsg.m_pcBuf, 0, outMsg.m_iSize);
				memcpy(outMsg.m_pcBuf, stMsg.m_pcBuf, stMsg.m_iLen);
				outMsg.m_iLen = stMsg.m_iLen;
				iRet = 0;
			}
			else
			{
				fprintf(stderr, "Failed %s\r\n", __FUNCTION__);
			}
			stMsg.Free();	
		}
		m_stMsg_list.pop_front();

		return iRet;
	}

	//enlarge list's size
	void cxatomicQueue::resize()
	{
		std::size_t curSiz = m_stMsg_list.size();
		std::size_t maxSiz = m_stMsg_list.max_size();

		if (curSiz == maxSiz)
		{
			m_stMsg_list.resize( 2 * maxSiz );
			printf("%s:%d:%s, Warning:message list size update %zu to %zu.\r\n", __FILE__, __LINE__, __FUNCTION__, curSiz, 2*maxSiz);
		}

		return;
	}

}//namespace xman
使用原子隊列:

    在多線程各自的routine中使用原子消息隊列,就像單線程中使用普通隊列一樣就可以了。隊列的互斥使用已經由原子消息隊列內部互斥機制保證實現了。Easy,please enoy this nice queue.

void* thread_produce(void *pvoid)
{
	struct message_st stMsg;
	bool bRet = stMsg.Malloc(MAX_RCV_SIZE);

	while(bRet)
	{
		//produce a message
		char acBuf[] = "hello world.";
		stMsg.SetMsg(acBuf, strlen(acBuf));
		
		//push the message to queue
		g_msg_queue.PushBack(stMsg);
		printf("Push one message.\r\n");
		
		//renew message
		memset(stMsg.m_pcBuf, 0, MAX_RCV_SIZE);
	}

	return NULL;
}


void* thread_consume(void *pvoid)
{
	struct message_st stMsg;
	bool bRet = stMsg.Malloc(MAX_RCV_SIZE);

	while(bRet)
	{ 	
		//pop front message
		int iRet = g_msg_queue.PopFront(stMsg);
		if (iRet == 0)
		{
			printf("Read one message(%dBytes):{%s}\r\n", stMsg.m_iLen, (char*)stMsg.m_pcBuf);
		}
	}

	return NULL;
}

如果不使用原子隊列,那就需要在主進程中,創建一個公共消息隊列和此隊列的pthread_mutex_t對象g_queue_mutex,然后每個線程routine使用隊列前后都pthread_mutex_lock(&g_queue_mutex)和pthread_mutex_unlock(&g_queue_mutex)就可以了。   


這種方法如果使用不當,會產生死鎖現象。如果使用mutex時,產生了死鎖。以前的一篇文章有專門寫了定位死鎖的gdb腳本,很容易定位,跑一遍就ok,enoy~。


也可以用使用信號量semaphore配合mutex管理隊列。


(end)


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com