* NEW [dds] Optimize the process of mqtt client receive and the process of send msgs received to dds.

Signed-off-by: wanghaemq <wangwei@emqx.io>
This commit is contained in:
wanghaemq
2024-02-02 04:17:38 -05:00
committed by wangha
parent 2d14d95b9d
commit e3aad9e783
3 changed files with 27 additions and 46 deletions

View File

@ -558,7 +558,7 @@ dds_client(dds_cli *cli, mqtt_cli *mqttcli)
while (true) {
// If handle queue is not empty. Handle it first.
// Or we need to receive msgs from DDS in a NONBLOCK way and
// put it to the handle queue. Wait when handle queue is
// put it to the handle queue. Wait cv when handle queue is
// empty.
hd = NULL;
@ -606,7 +606,9 @@ dds_client(dds_cli *cli, mqtt_cli *mqttcli)
// Put to MQTTClient's handle queue
pthread_mutex_lock(&mqttcli->mtx);
nftp_vec_append(mqttcli->handleq, hd);
pthread_cond_signal(&mqttcli->cv);
pthread_mutex_unlock(&mqttcli->mtx);
log_dds("[DDS] Forward msg to mqtt, cnt %d", ++forward2mqtt_cnt);
break;
default:

View File

@ -251,23 +251,6 @@ static pthread_t recvthr;
static nftp_vec *rmsgq;
static pthread_mutex_t rmsgq_mtx;
// TODO
// It works in a NONBLOCK way
// Return 0 when got msg. return 1 when no msg; else errors happened
static int
client_recv(mqtt_cli *cli, nng_msg **msgp)
{
pthread_mutex_lock(&rmsgq_mtx);
if (nftp_vec_len(rmsgq) == 0) {
pthread_mutex_unlock(&rmsgq_mtx);
return 1;
}
nftp_vec_pop(rmsgq, (void **)msgp, NFTP_HEAD);
pthread_mutex_unlock(&rmsgq_mtx);
return 0;
}
static int
client_recv2(mqtt_cli *cli, nng_msg **msgp)
{
@ -303,21 +286,32 @@ mqtt_recv_loop(void *arg)
{
mqtt_cli *cli = arg;
nng_msg *msg;
while (cli->running) {
msg = NULL;
if (0 != client_recv2(cli, &msg))
continue;
pthread_mutex_lock(&rmsgq_mtx);
if (nftp_vec_len(rmsgq) * 4 == nftp_vec_cap(rmsgq)) {
uint32_t topicsz;
const char *topic = nng_mqtt_msg_get_publish_topic(msg, &topicsz);
char *srctopic = strndup(topic, topicsz);
handle *hd;
hd = mk_handle(HANDLE_TO_DDS, msg, 0, srctopic);
pthread_mutex_lock(&cli->mtx);
// watermark log
if (nftp_vec_len(cli->handleq) * 4 == nftp_vec_cap(cli->handleq)) {
log_dds("WARNING 1 / 4 of the queue from MQTT to DDS is used.");
} else if (nftp_vec_len(rmsgq) * 2 == nftp_vec_cap(rmsgq)) {
} else if (nftp_vec_len(cli->handleq) * 2 == nftp_vec_cap(cli->handleq)) {
log_dds("WARNING 1 / 2 of the queue from MQTT to DDS is used.");
} else if (nftp_vec_len(rmsgq) == nftp_vec_cap(rmsgq)) {
} else if (nftp_vec_len(cli->handleq) == nftp_vec_cap(cli->handleq)) {
log_dds("WARNING All of the queue from MQTT to DDS is used. Drop msg.");
}
nftp_vec_append(rmsgq, msg);
pthread_mutex_unlock(&rmsgq_mtx);
nftp_vec_append(cli->handleq, (void *) hd);
pthread_cond_signal(&cli->cv);
pthread_mutex_unlock(&cli->mtx);
}
return NULL;
@ -339,11 +333,13 @@ mqtt_loop(void *arg)
while (cli->running) {
// If handle queue is not empty. Handle it first.
// Or we need to receive msgs from nng in a NONBLOCK way and
// put it to the handle queue. Sleep when handle queue is
// put it to the handle queue. Wait cv when handle queue is
// empty.
hd = NULL;
pthread_mutex_lock(&cli->mtx);
while (nftp_vec_len(cli->handleq) == 0)
pthread_cond_wait(&cli->cv, &cli->mtx);
if (nftp_vec_len(cli->handleq))
nftp_vec_pop(cli->handleq, (void **) &hd, NFTP_HEAD);
pthread_mutex_unlock(&cli->mtx);
@ -351,28 +347,8 @@ mqtt_loop(void *arg)
if (hd)
goto work;
rv = client_recv(cli, &msg);
if (rv < 0) {
log_dds("Error in recv msg\n");
continue;
} else if (rv == 0) {
// Received msg and put to handleq
uint32_t topicsz;
const char *topic = nng_mqtt_msg_get_publish_topic(msg, &topicsz);
char *srctopic = strndup(topic, topicsz);
hd = mk_handle(HANDLE_TO_DDS, msg, 0, srctopic);
pthread_mutex_lock(&cli->mtx);
nftp_vec_append(cli->handleq, (void *) hd);
pthread_mutex_unlock(&cli->mtx);
continue;
}
// else No msgs available
// Sleep and continue
nng_msleep(200);
continue;
work:
switch (hd->type) {
case HANDLE_TO_DDS:
@ -436,6 +412,7 @@ mqtt_connect(mqtt_cli *cli, void *dc, dds_gateway_conf *config)
nftp_vec_alloc(&cli->handleq);
pthread_mutex_init(&cli->mtx, NULL);
pthread_cond_init(&cli->cv, NULL);
cli->ddscli = ddscli;
@ -458,6 +435,7 @@ mqtt_disconnect(mqtt_cli *cli)
if (cli->handleq)
nftp_vec_free(cli->handleq);
pthread_mutex_destroy(&cli->mtx);
pthread_cond_destroy(&cli->cv);
// XXX Remove the temparary rmsgq and its mtx
nftp_vec_free(rmsgq);

View File

@ -35,6 +35,7 @@ struct mqtt_cli {
nftp_vec *handleq;
pthread_mutex_t mtx;
pthread_cond_t cv;
// dds client
void *ddscli;