프로세스 간 메시지 통신은 System V (msgget, msgsnd, msgrcv) 나 POSIX (mq_open, mq_send, mq_receive)
라이브러리로 구현 가능.
프로세스 내의 통신이라면 mutex 와 queue 를 이용하여 아래처럼 구현하면, kernel space 까지 데이터 이동이 없으므로 효율적일 것임
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <time.h> // clock()
#define LOGP(x, ...) printf("[%ld][T0x04%lx] " x "\n", clock(), (pthread_self()&0xffff), ##__VA_ARGS__)
typedef struct st_elem {
uint8_t *data;
size_t data_len; //data bytes under use
struct st_elem *next;
} mymq_msg_elem;
typedef struct {
long flags;
long curmsgs;
long maxmsg;
long msgsize;
pthread_mutex_t mutex_send;
pthread_mutex_t mutex_recv;
pthread_mutex_t mutex_rw;
mymq_msg_elem *msg_first;
mymq_msg_elem *msg_last;
mymq_msg_elem *freed_msg_first;
mymq_msg_elem *freed_msg_last;
uint8_t *data_buf_chunk;
mymq_msg_elem *msg_elem_chunk;
} mymq_t;
int
mymq_init (mymq_t *mymq, int maxmsg, int msgsize)
{
int i;
mymq->flags = 0;
mymq->curmsgs = 0;
mymq->maxmsg = maxmsg;
mymq->msgsize = msgsize;
mymq->data_buf_chunk = (uint8_t *)malloc(maxmsg*msgsize);
mymq->msg_elem_chunk = (mymq_msg_elem *)malloc(sizeof(mymq_msg_elem)*maxmsg);
for(i=0; i < (maxmsg-1); i++)
{
mymq->msg_elem_chunk[i].data = &mymq->data_buf_chunk[i*msgsize];
mymq->msg_elem_chunk[i].data_len = 0;
mymq->msg_elem_chunk[i].next = &mymq->msg_elem_chunk[i+1];
}
mymq->msg_elem_chunk[i].data = &mymq->data_buf_chunk[i*msgsize];
mymq->msg_elem_chunk[i].next = NULL;
mymq->msg_first = NULL;
mymq->msg_last = NULL;
mymq->freed_msg_first = &mymq->msg_elem_chunk[0];
mymq->freed_msg_last = &mymq->msg_elem_chunk[maxmsg-1];
pthread_mutex_init(&mymq->mutex_send, NULL);
pthread_mutex_init(&mymq->mutex_recv, NULL);
pthread_mutex_init(&mymq->mutex_rw, NULL);
pthread_mutex_lock(&mymq->mutex_recv);
}
int
mymq_send (mymq_t *mymq, const char *msg_ptr, size_t msg_len, unsigned int msg_prio)
{
mymq_msg_elem *new_msg;
LOGP("==> %s[Enter], cur=%ld/%ld", __func__, mymq->curmsgs, mymq->maxmsg);
if(msg_len > mymq->msgsize) {
printf("Tried to add too big message(%ld) expected (%ld).\n", msg_len, mymq->msgsize);
return -1;
}
pthread_mutex_lock(&mymq->mutex_send);
pthread_mutex_lock(&mymq->mutex_rw);
LOGP("==> %s[Processing], cur=%ld", __func__, mymq->curmsgs);
new_msg = mymq->freed_msg_first;
mymq->freed_msg_first = mymq->freed_msg_first->next;
new_msg->next = NULL;
if(mymq->freed_msg_first == NULL)
mymq->freed_msg_last = NULL;
mymq->curmsgs ++;
memcpy(new_msg->data, msg_ptr, msg_len);
new_msg->data_len = msg_len;
if(mymq->msg_first == NULL) {
mymq->msg_first = new_msg;
mymq->msg_last = new_msg;
} else {
mymq->msg_last->next = new_msg;
mymq->msg_last = new_msg;
}
pthread_mutex_unlock(&mymq->mutex_rw);
if(mymq->freed_msg_first != NULL) {
pthread_mutex_unlock(&mymq->mutex_send);
} else {
LOGP("Not unlock mutex because of no freed_msg");
}
pthread_mutex_unlock(&mymq->mutex_recv);
}
ssize_t
mymq_receive (mymq_t *mymq, char *msg_ptr, size_t msg_len, unsigned int *msg_prio)
{
mymq_msg_elem *new_msg;
LOGP("==> %s[Enter], cur=%ld/%ld", __func__, mymq->curmsgs, mymq->maxmsg);
if(msg_len < mymq->msgsize) {
printf("Tried to get with small buf(%ld) expected (%ld).\n", msg_len, mymq->msgsize);
return -1;
}
pthread_mutex_lock(&mymq->mutex_recv);
pthread_mutex_lock(&mymq->mutex_rw);
LOGP("==> %s[Processing], cur=%ld", __func__, mymq->curmsgs);
new_msg = mymq->msg_first;
new_msg->next = NULL;
mymq->msg_first = mymq->msg_first->next;
if(mymq->msg_first == NULL)
mymq->msg_last = NULL;
mymq->curmsgs --;
memcpy(msg_ptr, new_msg->data, new_msg->data_len);
new_msg->data_len = 0;
if(mymq->freed_msg_first == NULL) {
mymq->freed_msg_first = new_msg;
mymq->freed_msg_last = new_msg;
} else {
mymq->freed_msg_last->next = new_msg;
mymq->freed_msg_last = new_msg;
}
pthread_mutex_unlock(&mymq->mutex_rw);
if(mymq->msg_first != NULL)
pthread_mutex_unlock(&mymq->mutex_recv);
pthread_mutex_unlock(&mymq->mutex_send);
}
#define TYPES_OF_MSG 6
#define MSG_BUF_SIZE 20
char test_msg[TYPES_OF_MSG][MSG_BUF_SIZE];
void*
task1 (void *vInfo)
{
mymq_t *pMyMQ = (mymq_t *)vInfo;
uint8_t buf[MSG_BUF_SIZE];
pthread_t tid;
tid = pthread_self();
LOGP("Started");
//LOGP("sleep %d sec", 10);
//sleep(10);
mymq_receive(pMyMQ, buf, MSG_BUF_SIZE, 0);
LOGP("receved:%s\n", buf);
}
void
main (void)
{
int i;
pthread_t thread[3];
mymq_t mymq;
LOGP("Main Start");
for(i = 0; i < TYPES_OF_MSG; i++)
snprintf(test_msg[i], MSG_BUF_SIZE-1, "test msg#%d", i);
sleep(1);
LOGP("Main slept 1 sec");
mymq_init(&mymq, 2, MSG_BUF_SIZE);
for(i=0; i < 3; i++)
pthread_create(&thread[i], NULL, task1, &mymq);
sleep(1);
for(i=0; i < TYPES_OF_MSG; i++)
{
LOGP("main trying to send %d", i);
mymq_send(&mymq, test_msg[i], strlen(test_msg[i])+1, 0);
LOGP("main sent %d", i);
sleep(2);
}
}
'Linux Programming' 카테고리의 다른 글
Log template (0) | 2022.09.28 |
---|
댓글