본문 바로가기
Linux Programming

프로세스 내에서 Thread 간 통신을 위한 메시지 큐 구현

by 감자최고 2022. 8. 27.

프로세스 간 메시지 통신은 System V (msgget, msgsnd, msgrcv) 나 POSIX (mq_open, mq_send, mq_receive)

 라이브러리로 구현 가능.

프로세스 내의 통신이라면 mutex 와 queue 를 이용하여 아래처럼 구현하면, kernel space 까지 데이터 이동이 없으므로 효율적일 것임

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <time.h> // clock()

#define LOGP(x, ...)	printf("[%ld][T0x04%lx] " x "\n", clock(), (pthread_self()&0xffff), ##__VA_ARGS__)

typedef struct st_elem {
	uint8_t *data;
	size_t	data_len; //data bytes under use
	struct st_elem *next;
} mymq_msg_elem;
	
typedef struct {
	long flags;
	long curmsgs;
	long maxmsg;
	long msgsize;
	
	pthread_mutex_t	mutex_send;
	pthread_mutex_t	mutex_recv;
	pthread_mutex_t	mutex_rw;

	mymq_msg_elem *msg_first;
	mymq_msg_elem *msg_last;
	mymq_msg_elem *freed_msg_first;
	mymq_msg_elem *freed_msg_last;
	uint8_t *data_buf_chunk;
	mymq_msg_elem *msg_elem_chunk;
} mymq_t;

int 
mymq_init (mymq_t *mymq, int maxmsg, int msgsize)
{
	int i;

	mymq->flags = 0;
	mymq->curmsgs = 0;
	mymq->maxmsg = maxmsg;
	mymq->msgsize = msgsize;

	mymq->data_buf_chunk = (uint8_t *)malloc(maxmsg*msgsize);
	mymq->msg_elem_chunk = (mymq_msg_elem *)malloc(sizeof(mymq_msg_elem)*maxmsg);
	
	for(i=0; i < (maxmsg-1); i++)
	{
		mymq->msg_elem_chunk[i].data = &mymq->data_buf_chunk[i*msgsize];
		mymq->msg_elem_chunk[i].data_len = 0;
		mymq->msg_elem_chunk[i].next = &mymq->msg_elem_chunk[i+1];
	}
	mymq->msg_elem_chunk[i].data = &mymq->data_buf_chunk[i*msgsize];
	mymq->msg_elem_chunk[i].next = NULL;

	mymq->msg_first = NULL;
	mymq->msg_last = NULL;
	mymq->freed_msg_first = &mymq->msg_elem_chunk[0]; 
	mymq->freed_msg_last = &mymq->msg_elem_chunk[maxmsg-1];

	pthread_mutex_init(&mymq->mutex_send, NULL);
	pthread_mutex_init(&mymq->mutex_recv, NULL);
	pthread_mutex_init(&mymq->mutex_rw, NULL);

	pthread_mutex_lock(&mymq->mutex_recv);
}

int
mymq_send (mymq_t *mymq, const char *msg_ptr, size_t msg_len, unsigned int msg_prio)
{
	mymq_msg_elem	*new_msg;

	LOGP("==> %s[Enter], cur=%ld/%ld", __func__, mymq->curmsgs, mymq->maxmsg);

	if(msg_len > mymq->msgsize) {
		printf("Tried to add too big message(%ld) expected (%ld).\n", msg_len, mymq->msgsize);
		return -1;
	}
	
	pthread_mutex_lock(&mymq->mutex_send);
	pthread_mutex_lock(&mymq->mutex_rw);
	
	LOGP("==> %s[Processing], cur=%ld", __func__, mymq->curmsgs);

	new_msg = mymq->freed_msg_first;
	mymq->freed_msg_first = mymq->freed_msg_first->next;
	new_msg->next = NULL;
	if(mymq->freed_msg_first == NULL)
		mymq->freed_msg_last = NULL;
	mymq->curmsgs ++;
	
	memcpy(new_msg->data, msg_ptr, msg_len);
	new_msg->data_len = msg_len;

	if(mymq->msg_first == NULL) {
		mymq->msg_first = new_msg;
		mymq->msg_last = new_msg;
	} else {
		mymq->msg_last->next = new_msg;
		mymq->msg_last = new_msg;
	}

	pthread_mutex_unlock(&mymq->mutex_rw);
	if(mymq->freed_msg_first != NULL) {
		pthread_mutex_unlock(&mymq->mutex_send);
	} else {
		LOGP("Not unlock mutex because of no freed_msg");
	}
	pthread_mutex_unlock(&mymq->mutex_recv);
}

ssize_t
mymq_receive (mymq_t *mymq, char *msg_ptr, size_t msg_len, unsigned int *msg_prio)
{
	mymq_msg_elem	*new_msg;

	LOGP("==> %s[Enter], cur=%ld/%ld", __func__, mymq->curmsgs, mymq->maxmsg);

	if(msg_len < mymq->msgsize) {
		printf("Tried to get with small buf(%ld) expected (%ld).\n", msg_len, mymq->msgsize);
		return -1;
	}
	
	pthread_mutex_lock(&mymq->mutex_recv);
	pthread_mutex_lock(&mymq->mutex_rw);
	
	LOGP("==> %s[Processing], cur=%ld", __func__, mymq->curmsgs);

	new_msg = mymq->msg_first;
	new_msg->next = NULL;
	mymq->msg_first = mymq->msg_first->next;
	if(mymq->msg_first == NULL)
		mymq->msg_last = NULL;
	mymq->curmsgs --;
	
	memcpy(msg_ptr, new_msg->data, new_msg->data_len);
	new_msg->data_len = 0;

	if(mymq->freed_msg_first == NULL) {
		mymq->freed_msg_first = new_msg;
		mymq->freed_msg_last = new_msg;
	} else {
		mymq->freed_msg_last->next = new_msg;
		mymq->freed_msg_last = new_msg;
	}

	pthread_mutex_unlock(&mymq->mutex_rw);
	if(mymq->msg_first != NULL)
		pthread_mutex_unlock(&mymq->mutex_recv);
	pthread_mutex_unlock(&mymq->mutex_send);
}

#define TYPES_OF_MSG	6
#define MSG_BUF_SIZE	20
char test_msg[TYPES_OF_MSG][MSG_BUF_SIZE];


void* 
task1 (void *vInfo)
{
	mymq_t	*pMyMQ = (mymq_t *)vInfo;
	uint8_t	buf[MSG_BUF_SIZE];
	pthread_t	tid;

	tid = pthread_self();

	LOGP("Started");

	//LOGP("sleep %d sec", 10);
	//sleep(10);

	mymq_receive(pMyMQ, buf, MSG_BUF_SIZE, 0);

	LOGP("receved:%s\n", buf);
}

void
main (void)
{
	int			i;
	pthread_t	thread[3];
	mymq_t		mymq;

	LOGP("Main Start");
	for(i = 0; i < TYPES_OF_MSG; i++)
		snprintf(test_msg[i], MSG_BUF_SIZE-1, "test msg#%d", i);

	sleep(1);
	LOGP("Main slept 1 sec");

	mymq_init(&mymq, 2, MSG_BUF_SIZE);

	for(i=0; i < 3; i++)
		pthread_create(&thread[i], NULL, task1, &mymq);

	sleep(1);
	for(i=0; i < TYPES_OF_MSG; i++)
	{
		LOGP("main trying to send %d", i);
		mymq_send(&mymq, test_msg[i], strlen(test_msg[i])+1, 0);
		LOGP("main sent %d", i);
		sleep(2);
	}
}

'Linux Programming' 카테고리의 다른 글

Log template  (0) 2022.09.28

댓글