Linux MQ接收与发送
参数说明:
// 消息队列的标志。目前,这个字段只用于设置或获取O_NONBLOCK标志,以指示消息队列是否应该以非阻塞模式打开
attr.mq_flags = 0;
// 队列中允许的最大消息数
attr.mq_maxmsg = 10;
// 每条消息的最大字节数
attr.mq_msgsize = MSG_SIZE;
// 当前在队列中的消息数(注意:这个字段在mq_open调用时应该被设置为0,因为它是只读属性,用于获取当前队列中的消息数量,而不是设置它)
attr.mq_curmsgs = 0;
代码如下:
#include <iostream>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <cstring>
#include <cerrno>
#include <chrono>
#include <thread>
#include <cstdlib>
#include <unistd.h>
#define MAX_SIZE 1024
void messageSender() {
char message[MAX_SIZE];
static int message_id = 0;
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = MAX_SIZE;
attr.mq_curmsgs = 0; // 注意:这个字段在mq_open时应该设置为0,因为它是只读的
mqd_t mq = mq_open("/queue_b", O_RDWR | O_CREAT | O_NONBLOCK, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
return;
}
while (true) {
snprintf(message, sizeof(message), "Hello from C++, ID: %d", message_id++);
// std::cout << "Message sent: " << message << std::endl;
if (mq_send(mq, message, strlen(message) + 1, 0) == -1) {
perror("mq_send");
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
mq_close(mq);
mq_unlink("/queue_b"); // 删除消息队列
}
void messageReceiver() {
char buffer[MAX_SIZE];
ssize_t bytesRead;
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = MAX_SIZE;
attr.mq_curmsgs = 0;
mqd_t mq = mq_open("/queue_a", O_RDWR | O_CREAT | O_NONBLOCK, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
return;
}
while (true) {
bytesRead = mq_receive(mq, buffer, MAX_SIZE, NULL);
if (bytesRead != -1) {
buffer[bytesRead] = '\0';
std::cout << "Message received: " << buffer << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
mq_close(mq);
mq_unlink("/queue_a"); // 删除消息队列
}
int main() {
std::thread senderThread(messageSender);
std::thread receiverThread(messageReceiver);
senderThread.join();
receiverThread.join();
return 0;
}
编译命令:
g++ -o a a.cpp -lpthread -lrt