mirror of
https://github.com/nanomq/nanomq.git
synced 2025-06-22 01:57:27 +00:00
* MDF [fatal] replace private functions fatal() & nng_fatal() with the public ones from nanolib/utils.h;
This commit is contained in:
@ -30,6 +30,7 @@
|
||||
#include "nng/supplemental/nanolib/hash_table.h"
|
||||
#include "nng/supplemental/nanolib/mqtt_db.h"
|
||||
#include "nng/supplemental/nanolib/log.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
#include "include/acl_handler.h"
|
||||
#include "include/bridge.h"
|
||||
@ -153,13 +154,6 @@ intHandler(int dummy)
|
||||
}
|
||||
#endif
|
||||
|
||||
void
|
||||
fatal(const char *func, int rv)
|
||||
{
|
||||
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
static inline bool
|
||||
bridge_handler(nano_work *work)
|
||||
{
|
||||
@ -262,7 +256,7 @@ server_cb(void *arg)
|
||||
}
|
||||
}
|
||||
if ((msg = nng_aio_get_msg(work->aio)) == NULL) {
|
||||
fatal("RECV NULL MSG", rv);
|
||||
nng_fatal("RECV NULL MSG", rv);
|
||||
}
|
||||
if (work->proto == PROTO_MQTT_BRIDGE) {
|
||||
uint8_t type;
|
||||
@ -491,7 +485,7 @@ server_cb(void *arg)
|
||||
if (nng_msg_get_type(work->msg) == CMD_PUBLISH) {
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
log_error("WAIT nng aio result error: %d", rv);
|
||||
fatal("WAIT nng_ctx_recv/send", rv);
|
||||
nng_fatal("WAIT nng_ctx_recv/send", rv);
|
||||
}
|
||||
smsg = work->msg; // reuse the same msg
|
||||
cvector(mqtt_msg_info) msg_infos;
|
||||
@ -578,7 +572,7 @@ server_cb(void *arg)
|
||||
work->msg = NULL;
|
||||
}
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
fatal("SEND nng_ctx_send", rv);
|
||||
nng_fatal("SEND nng_ctx_send", rv);
|
||||
}
|
||||
if (work->pub_packet != NULL) {
|
||||
free_pub_packet(work->pub_packet);
|
||||
@ -604,7 +598,7 @@ server_cb(void *arg)
|
||||
if (nng_msg_get_type(work->msg) == CMD_PUBLISH) {
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
log_error("WAIT nng aio result error: %d", rv);
|
||||
fatal("WAIT nng_ctx_recv/send", rv);
|
||||
nng_fatal("WAIT nng_ctx_recv/send", rv);
|
||||
}
|
||||
smsg = work->msg; // reuse the same msg
|
||||
work->msg = NULL;
|
||||
@ -702,7 +696,7 @@ server_cb(void *arg)
|
||||
nng_ctx_recv(work->ctx, work->aio);
|
||||
break;
|
||||
default:
|
||||
fatal("bad state!", NNG_ESTATE);
|
||||
nng_fatal("bad state!", NNG_ESTATE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -714,13 +708,13 @@ alloc_work(nng_socket sock)
|
||||
int rv;
|
||||
|
||||
if ((w = nng_alloc(sizeof(*w))) == NULL) {
|
||||
fatal("nng_alloc", NNG_ENOMEM);
|
||||
nng_fatal("nng_alloc", NNG_ENOMEM);
|
||||
}
|
||||
if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
|
||||
fatal("nng_aio_alloc", rv);
|
||||
nng_fatal("nng_aio_alloc", rv);
|
||||
}
|
||||
if ((rv = nng_ctx_open(&w->ctx, sock)) != 0) {
|
||||
fatal("nng_ctx_open", rv);
|
||||
nng_fatal("nng_ctx_open", rv);
|
||||
}
|
||||
|
||||
w->pipe_ct = nng_alloc(sizeof(struct pipe_content));
|
||||
@ -747,29 +741,29 @@ proto_work_init(nng_socket sock,nng_socket inproc_sock, nng_socket bridge_sock,
|
||||
// only create ctx for extra ctx that are required to receive msg
|
||||
if (config->http_server.enable && proto == PROTO_HTTP_SERVER) {
|
||||
if ((rv = nng_ctx_open(&w->extra_ctx, inproc_sock)) != 0) {
|
||||
fatal("nng_ctx_open", rv);
|
||||
nng_fatal("nng_ctx_open", rv);
|
||||
}
|
||||
} else if (config->bridge_mode) {
|
||||
if (proto == PROTO_MQTT_BRIDGE) {
|
||||
if ((rv = nng_ctx_open(&w->extra_ctx, bridge_sock)) !=
|
||||
0) {
|
||||
fatal("nng_ctx_open", rv);
|
||||
nng_fatal("nng_ctx_open", rv);
|
||||
}
|
||||
} else if (proto == PROTO_AWS_BRIDGE) {
|
||||
if ((rv = nng_ctx_open(&w->extra_ctx, inproc_sock)) !=
|
||||
0) {
|
||||
fatal("nng_ctx_open", rv);
|
||||
nng_fatal("nng_ctx_open", rv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(config->web_hook.enable) {
|
||||
if ((rv = nng_push0_open(&w->webhook_sock)) != 0) {
|
||||
fatal("nng_socket", rv);
|
||||
nng_fatal("nng_socket", rv);
|
||||
}
|
||||
if ((rv = nng_dial(w->webhook_sock, WEB_HOOK_INPROC_URL, NULL,
|
||||
0)) != 0) {
|
||||
fatal("nng_dial", rv);
|
||||
nng_fatal("nng_dial", rv);
|
||||
}
|
||||
}
|
||||
|
||||
@ -866,7 +860,7 @@ broker(conf *nanomq_conf)
|
||||
sock.data = nanomq_conf;
|
||||
rv = nng_nmq_tcp0_open(&sock);
|
||||
if (rv != 0) {
|
||||
fatal("nng_nmq_tcp0_open", rv);
|
||||
nng_fatal("nng_nmq_tcp0_open", rv);
|
||||
}
|
||||
log_debug("listener init finished");
|
||||
|
||||
@ -877,7 +871,7 @@ broker(conf *nanomq_conf)
|
||||
log_debug("HTTP service initialization");
|
||||
rv = nng_rep0_open(&inproc_sock);
|
||||
if (rv != 0) {
|
||||
fatal("nng_rep0_open", rv);
|
||||
nng_fatal("nng_rep0_open", rv);
|
||||
}
|
||||
// set 4 ctx for HTPP as default
|
||||
if (nanomq_conf->http_server.enable) {
|
||||
@ -983,7 +977,7 @@ broker(conf *nanomq_conf)
|
||||
|
||||
if (nanomq_conf->enable) {
|
||||
if ((rv = nano_listen(sock, nanomq_conf->url, NULL, 0, nanomq_conf)) != 0) {
|
||||
fatal("nng_listen", rv);
|
||||
nng_fatal("nng_listen", rv);
|
||||
}
|
||||
}
|
||||
|
||||
@ -991,7 +985,7 @@ broker(conf *nanomq_conf)
|
||||
if (nanomq_conf->websocket.enable) {
|
||||
if ((rv = nano_listen(
|
||||
sock, nanomq_conf->websocket.url, NULL, 0, nanomq_conf)) != 0) {
|
||||
fatal("nng_listen ws", rv);
|
||||
nng_fatal("nng_listen ws", rv);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1000,25 +994,25 @@ broker(conf *nanomq_conf)
|
||||
|
||||
if ((rv = nng_listener_create(
|
||||
&tls_listener, sock, nanomq_conf->tls.url)) != 0) {
|
||||
fatal("nng_listener_create tls", rv);
|
||||
nng_fatal("nng_listener_create tls", rv);
|
||||
}
|
||||
nng_listener_set(
|
||||
tls_listener, NANO_CONF, nanomq_conf, sizeof(nanomq_conf));
|
||||
|
||||
init_listener_tls(tls_listener, &nanomq_conf->tls);
|
||||
if ((rv = nng_listener_start(tls_listener, 0)) != 0) {
|
||||
fatal("nng_listener_start tls", rv);
|
||||
nng_fatal("nng_listener_start tls", rv);
|
||||
}
|
||||
// TODO websocket ssl
|
||||
// if (nanomq_conf->websocket.enable) {
|
||||
// nng_listener wss_listener;
|
||||
// if ((rv = nng_listener_create(&wss_listener, sock,
|
||||
// nanomq_conf->tls.url)) != 0) {
|
||||
// fatal("nng_listener_create wss", rv);
|
||||
// nng_fatal("nng_listener_create wss", rv);
|
||||
// }
|
||||
// init_listener_tls(wss_listener, &nanomq_conf->tls);
|
||||
// if ((rv = nng_listener_start(wss_listener, 0)) != 0) {
|
||||
// fatal("nng_listener_start wss", rv);
|
||||
// nng_fatal("nng_listener_start wss", rv);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
@ -1026,7 +1020,7 @@ broker(conf *nanomq_conf)
|
||||
if (nanomq_conf->http_server.enable || nanomq_conf->bridge_mode) {
|
||||
if ((rv = nano_listen(inproc_sock, INPROC_SERVER_URL, NULL, 0,
|
||||
nanomq_conf)) != 0) {
|
||||
fatal("nng_listen " INPROC_SERVER_URL, rv);
|
||||
nng_fatal("nng_listen " INPROC_SERVER_URL, rv);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1046,7 +1040,7 @@ broker(conf *nanomq_conf)
|
||||
/* Create the IPC socket for CMD Server. */
|
||||
rv = nng_rep0_open(&cmd_sock);
|
||||
if (rv != 0) {
|
||||
fatal("CMD socket ERROR: nng_rep0_open", rv);
|
||||
nng_fatal("CMD socket ERROR: nng_rep0_open", rv);
|
||||
}
|
||||
|
||||
for (i = 0; i < CMD_PROC_PARALLEL; i++) {
|
||||
@ -1056,7 +1050,7 @@ broker(conf *nanomq_conf)
|
||||
if (nano_file_exists(IPC_URL_PATH))
|
||||
nng_file_delete(IPC_URL_PATH);
|
||||
if ((rv = nng_listen(cmd_sock, CMD_IPC_URL, NULL, 0)) != 0) {
|
||||
fatal("nng_listen", rv);
|
||||
nng_fatal("nng_listen", rv);
|
||||
}
|
||||
|
||||
for (i = 0; i < CMD_PROC_PARALLEL; i++) {
|
||||
@ -1585,7 +1579,7 @@ broker_start(int argc, char **argv)
|
||||
}
|
||||
#if defined(ENABLE_LOG)
|
||||
if ((rc = log_init(&nanomq_conf->log)) != 0) {
|
||||
fatal("log_init", rc);
|
||||
nng_fatal("log_init", rc);
|
||||
}
|
||||
#endif
|
||||
print_conf(nanomq_conf);
|
||||
|
@ -23,16 +23,11 @@
|
||||
#include "nng/nng.h"
|
||||
#include "nng/protocol/reqrep0/req.h"
|
||||
#include "nng/supplemental/nanolib/conf.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
#include "nng/supplemental/nanolib/log.h"
|
||||
#include "pub_handler.h"
|
||||
|
||||
#define fatal(msg, rv) \
|
||||
{ \
|
||||
printf("%s:%s\n", msg, nng_strerror(rv)); \
|
||||
exit(1); \
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief ALPN (Application-Layer Protocol Negotiation) protocol name for AWS
|
||||
* IoT MQTT.
|
||||
@ -460,12 +455,12 @@ mqtt_thread(void *arg)
|
||||
nng_socket req_sock;
|
||||
|
||||
if ((rv = nng_req0_open(&req_sock)) != 0) {
|
||||
fatal("nng_rep0_open", rv);
|
||||
nng_fatal("nng_rep0_open", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_dial(
|
||||
req_sock, INPROC_SERVER_URL, NULL, NNG_FLAG_NONBLOCK)) != 0) {
|
||||
fatal("nng_dial " INPROC_SERVER_URL, rv);
|
||||
nng_fatal("nng_dial " INPROC_SERVER_URL, rv);
|
||||
}
|
||||
|
||||
conf_bridge_node * node = (conf_bridge_node *) arg;
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "nng/protocol/mqtt/mqtt.h"
|
||||
#include "nng/supplemental/nanolib/log.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
@ -23,12 +24,6 @@ static property *conn_property(conf_bridge_conn_properties *conf_prop);
|
||||
|
||||
static nng_thread *hybridger_thr;
|
||||
|
||||
static void
|
||||
fatal(const char *func, int rv)
|
||||
{
|
||||
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
|
||||
}
|
||||
|
||||
static int
|
||||
apply_sqlite_config(
|
||||
nng_socket *sock, conf_bridge_node *config, const char *db_name)
|
||||
@ -38,7 +33,7 @@ apply_sqlite_config(
|
||||
// create sqlite option
|
||||
nng_mqtt_sqlite_option *opt;
|
||||
if ((rv = nng_mqtt_alloc_sqlite_opt(&opt)) != 0) {
|
||||
fatal("nng_mqtt_alloc_sqlite_opt", rv);
|
||||
nng_fatal("nng_mqtt_alloc_sqlite_opt", rv);
|
||||
}
|
||||
|
||||
nng_mqtt_set_sqlite_conf(opt, config);
|
||||
@ -314,12 +309,12 @@ hybrid_bridge_tcp_client(bridge_param *bridge_arg)
|
||||
|
||||
if (node->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
|
||||
if ((rv = nng_mqttv5_client_open(sock)) != 0) {
|
||||
fatal("nng_mqttv5_client_open", rv);
|
||||
nng_fatal("nng_mqttv5_client_open", rv);
|
||||
return rv;
|
||||
}
|
||||
} else {
|
||||
if ((rv = nng_mqtt_client_open(sock)) != 0) {
|
||||
fatal("nng_mqtt_client_open", rv);
|
||||
nng_fatal("nng_mqtt_client_open", rv);
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
@ -327,7 +322,7 @@ hybrid_bridge_tcp_client(bridge_param *bridge_arg)
|
||||
apply_sqlite_config(sock, node, "mqtt_client.db");
|
||||
|
||||
if ((rv = nng_dialer_create(&dialer, *sock, node->address))) {
|
||||
fatal("nng_dialer_create", rv);
|
||||
nng_fatal("nng_dialer_create", rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
@ -335,7 +330,7 @@ hybrid_bridge_tcp_client(bridge_param *bridge_arg)
|
||||
if (node->tls.enable) {
|
||||
if ((rv = init_dialer_tls(dialer, node->tls.ca, node->tls.cert,
|
||||
node->tls.key, node->tls.key_password)) != 0) {
|
||||
fatal("init_dialer_tls", rv);
|
||||
nng_fatal("init_dialer_tls", rv);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
@ -387,12 +382,12 @@ bridge_tcp_client(nng_socket *sock, conf *config, conf_bridge_node *node)
|
||||
|
||||
if (node->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
|
||||
if ((rv = nng_mqttv5_client_open(sock)) != 0) {
|
||||
fatal("nng_mqttv5_client_open", rv);
|
||||
nng_fatal("nng_mqttv5_client_open", rv);
|
||||
return rv;
|
||||
}
|
||||
} else {
|
||||
if ((rv = nng_mqtt_client_open(sock)) != 0) {
|
||||
fatal("nng_mqtt_client_open", rv);
|
||||
nng_fatal("nng_mqtt_client_open", rv);
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
@ -400,7 +395,7 @@ bridge_tcp_client(nng_socket *sock, conf *config, conf_bridge_node *node)
|
||||
apply_sqlite_config(sock, node, "mqtt_client.db");
|
||||
|
||||
if ((rv = nng_dialer_create(&dialer, *sock, node->address))) {
|
||||
fatal("nng_dialer_create", rv);
|
||||
nng_fatal("nng_dialer_create", rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
@ -408,7 +403,7 @@ bridge_tcp_client(nng_socket *sock, conf *config, conf_bridge_node *node)
|
||||
if (node->tls.enable) {
|
||||
if ((rv = init_dialer_tls(dialer, node->tls.ca, node->tls.cert,
|
||||
node->tls.key, node->tls.key_password)) != 0) {
|
||||
fatal("init_dialer_tls", rv);
|
||||
nng_fatal("init_dialer_tls", rv);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
@ -604,7 +599,7 @@ hybrid_bridge_quic_client(bridge_param *bridge_arg)
|
||||
|
||||
// keepalive here is for QUIC only
|
||||
if ((rv = nng_mqtt_quic_open_keepalive(sock, node->address, (void *)node)) != 0) {
|
||||
fatal("nng_mqtt_quic_client_open", rv);
|
||||
nng_fatal("nng_mqtt_quic_client_open", rv);
|
||||
return rv;
|
||||
}
|
||||
// TODO mqtt v5 protocol
|
||||
@ -655,7 +650,7 @@ bridge_quic_client(nng_socket *sock, conf *config, conf_bridge_node *node)
|
||||
|
||||
// keepalive here is for QUIC only
|
||||
if ((rv = nng_mqtt_quic_open_keepalive(sock, node->address, (void *)node)) != 0) {
|
||||
fatal("nng_mqtt_quic_client_open", rv);
|
||||
nng_fatal("nng_mqtt_quic_client_open", rv);
|
||||
return rv;
|
||||
}
|
||||
// mqtt v5 protocol
|
||||
@ -737,12 +732,12 @@ hybridger_cb(void *arg)
|
||||
|
||||
int rv = nng_mtx_alloc(&bridge_arg->switch_mtx);
|
||||
if (rv != 0) {
|
||||
fatal("nng_mtx_alloc", rv);
|
||||
nng_fatal("nng_mtx_alloc", rv);
|
||||
return;
|
||||
}
|
||||
rv = nng_cv_alloc(&bridge_arg->switch_cv, bridge_arg->switch_mtx);
|
||||
if (rv != 0) {
|
||||
fatal("nng_cv_alloc", rv);
|
||||
nng_fatal("nng_cv_alloc", rv);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -808,18 +803,18 @@ hybrid_bridge_client(nng_socket *sock, conf *config, conf_bridge_node *node)
|
||||
|
||||
int rv = nng_mtx_alloc(&bridge_arg->exec_mtx);
|
||||
if (rv != 0) {
|
||||
fatal("nng_mtx_alloc", rv);
|
||||
nng_fatal("nng_mtx_alloc", rv);
|
||||
return rv;
|
||||
}
|
||||
rv = nng_cv_alloc(&bridge_arg->exec_cv, bridge_arg->exec_mtx);
|
||||
if (rv != 0) {
|
||||
fatal("nng_cv_alloc", rv);
|
||||
nng_fatal("nng_cv_alloc", rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
rv = nng_thread_create(&hybridger_thr, hybridger_cb, (void *)bridge_arg);
|
||||
if (rv != 0) {
|
||||
fatal("nng_thread_create", rv);
|
||||
nng_fatal("nng_thread_create", rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
@ -874,7 +869,7 @@ bridge_client(nng_socket *sock, conf *config, conf_bridge_node *node)
|
||||
num++) {
|
||||
if ((rv = nng_aio_alloc(
|
||||
&node->bridge_aio[num], bridge_send_cb, node)) != 0) {
|
||||
fatal("bridge_aio nng_aio_alloc", rv);
|
||||
nng_fatal("bridge_aio nng_aio_alloc", rv);
|
||||
}
|
||||
log_debug("parallel %d", num);
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include "nng/protocol/reqrep0/rep.h"
|
||||
#include "nng/protocol/reqrep0/req.h"
|
||||
#include "nng/supplemental/nanolib/cJSON.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
|
||||
struct cmd_work {
|
||||
@ -18,13 +19,6 @@ struct cmd_work {
|
||||
conf * config;
|
||||
};
|
||||
|
||||
static void
|
||||
fatal(const char *func, int rv)
|
||||
{
|
||||
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
static int
|
||||
handle_recv(const char *msg, size_t msg_len, conf *config, char **err_msg)
|
||||
{
|
||||
@ -92,7 +86,7 @@ cmd_server_cb(void *arg)
|
||||
break;
|
||||
case RECV:
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
fatal("nng_recv_aio", rv);
|
||||
nng_fatal("nng_recv_aio", rv);
|
||||
}
|
||||
msg = nng_aio_get_msg(work->aio);
|
||||
char *cmd = (char *) nng_msg_body(msg);
|
||||
@ -119,14 +113,14 @@ cmd_server_cb(void *arg)
|
||||
case SEND:
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
nng_msg_free(work->msg);
|
||||
fatal("nng_send_aio", rv);
|
||||
nng_fatal("nng_send_aio", rv);
|
||||
}
|
||||
work->state = RECV;
|
||||
nng_ctx_recv(work->ctx, work->aio);
|
||||
break;
|
||||
|
||||
default:
|
||||
fatal("bad state!", NNG_ESTATE);
|
||||
nng_fatal("bad state!", NNG_ESTATE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -138,13 +132,13 @@ alloc_cmd_work(nng_socket sock, conf *config)
|
||||
int rv;
|
||||
|
||||
if ((w = nng_alloc(sizeof(*w))) == NULL) {
|
||||
fatal("nng_alloc", NNG_ENOMEM);
|
||||
nng_fatal("nng_alloc", NNG_ENOMEM);
|
||||
}
|
||||
if ((rv = nng_aio_alloc(&w->aio, cmd_server_cb, w)) != 0) {
|
||||
fatal("nng_aio_alloc", rv);
|
||||
nng_fatal("nng_aio_alloc", rv);
|
||||
}
|
||||
if ((rv = nng_ctx_open(&w->ctx, sock)) != 0) {
|
||||
fatal("nng_ctx_open", rv);
|
||||
nng_fatal("nng_ctx_open", rv);
|
||||
}
|
||||
w->config = config;
|
||||
w->state = INIT;
|
||||
@ -166,7 +160,7 @@ server(void *arg)
|
||||
/* Create the socket. */
|
||||
rv = nng_rep0_open(&sock);
|
||||
if (rv != 0) {
|
||||
fatal("nng_rep0_open", rv);
|
||||
nng_fatal("nng_rep0_open", rv);
|
||||
}
|
||||
|
||||
for (i = 0; i < CMD_PROC_PARALLEL; i++) {
|
||||
@ -174,7 +168,7 @@ server(void *arg)
|
||||
}
|
||||
|
||||
if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
|
||||
fatal("nng_listen", rv);
|
||||
nng_fatal("nng_listen", rv);
|
||||
}
|
||||
|
||||
for (i = 0; i < CMD_PROC_PARALLEL; i++) {
|
||||
@ -206,20 +200,20 @@ client(const char *cmd)
|
||||
const char *url = CMD_IPC_URL;
|
||||
|
||||
if ((rv = nng_req0_open(&sock)) != 0) {
|
||||
fatal("nng_req0_open", rv);
|
||||
nng_fatal("nng_req0_open", rv);
|
||||
}
|
||||
if ((rv = nng_dialer_create(&dialer, sock, url)) != 0) {
|
||||
fatal("nng_dialer_create", rv);
|
||||
nng_fatal("nng_dialer_create", rv);
|
||||
}
|
||||
nng_socket_set_ms(sock, NNG_OPT_REQ_RESENDTIME, 2000);
|
||||
nng_dialer_start(dialer, NNG_FLAG_ALLOC);
|
||||
|
||||
if ((rv = nng_send(sock, (void *) cmd, strlen(cmd) + 1, 0)) != 0) {
|
||||
fatal("nng_send", rv);
|
||||
nng_fatal("nng_send", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
|
||||
fatal("nng_recv", rv);
|
||||
nng_fatal("nng_recv", rv);
|
||||
}
|
||||
if (sz > 0) {
|
||||
printf("%.*s\n", (int) sz, (const char *) buf);
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "nng/supplemental/nanolib/cvector.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
#include "include/nanomq.h"
|
||||
|
||||
@ -47,12 +48,6 @@ static int finish_with_error(MYSQL *con, rule *rules, int index)
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void
|
||||
fatal(const char *func, int rv)
|
||||
{
|
||||
log_error("%s", nng_strerror(rv));
|
||||
}
|
||||
|
||||
int
|
||||
nano_client_publish(nng_socket *sock, const char *topic, uint8_t *payload,
|
||||
uint32_t len, uint8_t qos, property *props)
|
||||
@ -102,18 +97,18 @@ nano_client(nng_socket *sock, repub_t *repub)
|
||||
|
||||
if (repub->proto_ver == MQTT_PROTOCOL_VERSION_v5) {
|
||||
if ((rv = nng_mqttv5_client_open(sock)) != 0) {
|
||||
fatal("nng_mqttv5_client_open", rv);
|
||||
nng_fatal("nng_mqttv5_client_open", rv);
|
||||
return rv;
|
||||
}
|
||||
} else {
|
||||
if ((rv = nng_mqtt_client_open(sock)) != 0) {
|
||||
fatal("nng_mqtt_client_open", rv);
|
||||
nng_fatal("nng_mqtt_client_open", rv);
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
|
||||
if ((rv = nng_dialer_create(&dialer, *sock, repub->address))) {
|
||||
fatal("nng_dialer_create", rv);
|
||||
nng_fatal("nng_dialer_create", rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "nng/supplemental/http/http.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
#include "nng/supplemental/nanolib/log.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
#include "include/nanomq.h"
|
||||
#include "include/rest_api.h"
|
||||
@ -24,12 +25,6 @@
|
||||
#include "include/web_server.h"
|
||||
// #include "utils/log.h"
|
||||
|
||||
#define fatal(msg, rv) \
|
||||
{ \
|
||||
printf("%s:%s\n", msg, nng_strerror(rv)); \
|
||||
exit(1); \
|
||||
}
|
||||
|
||||
typedef enum {
|
||||
SEND_REQ, // Sending REQ request
|
||||
RECV_REP, // Receiving REQ reply
|
||||
@ -193,7 +188,7 @@ rest_job_cb(void *arg)
|
||||
rest_recycle_job(job);
|
||||
return;
|
||||
default:
|
||||
fatal("bad case", NNG_ESTATE);
|
||||
nng_fatal("bad case", NNG_ESTATE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -257,7 +252,7 @@ rest_start(uint16_t port)
|
||||
int rv;
|
||||
|
||||
if ((rv = nng_mtx_alloc(&job_lock)) != 0) {
|
||||
fatal("nng_mtx_alloc", rv);
|
||||
nng_fatal("nng_mtx_alloc", rv);
|
||||
}
|
||||
job_freelist = NULL;
|
||||
|
||||
@ -265,38 +260,38 @@ rest_start(uint16_t port)
|
||||
// from the argument list.
|
||||
snprintf(rest_addr, sizeof(rest_addr), REST_URL, port);
|
||||
if ((rv = nng_url_parse(&url, rest_addr)) != 0) {
|
||||
fatal("nng_url_parse", rv);
|
||||
nng_fatal("nng_url_parse", rv);
|
||||
}
|
||||
|
||||
// Create the REQ socket, and put it in raw mode, connected to
|
||||
// the remote REP server (our inproc server in this case).
|
||||
if ((rv = nng_req0_open(&req_sock)) != 0) {
|
||||
fatal("nng_req0_open", rv);
|
||||
nng_fatal("nng_req0_open", rv);
|
||||
}
|
||||
if ((rv = nng_dial(req_sock, INPROC_URL, NULL, NNG_FLAG_NONBLOCK)) !=
|
||||
0) {
|
||||
fatal("nng_dial(" INPROC_URL ")", rv);
|
||||
nng_fatal("nng_dial(" INPROC_URL ")", rv);
|
||||
}
|
||||
|
||||
// Get a suitable HTTP server instance. This creates one
|
||||
// if it doesn't already exist.
|
||||
if ((rv = nng_http_server_hold(&server, url)) != 0) {
|
||||
fatal("nng_http_server_hold", rv);
|
||||
nng_fatal("nng_http_server_hold", rv);
|
||||
}
|
||||
|
||||
// Allocate the handler - we use a dynamic handler for REST
|
||||
// using the function "rest_handle" declared above.
|
||||
rv = nng_http_handler_alloc(&handler, url->u_path, rest_handle);
|
||||
if (rv != 0) {
|
||||
fatal("nng_http_handler_alloc", rv);
|
||||
nng_fatal("nng_http_handler_alloc", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_handler_set_tree(handler)) != 0) {
|
||||
fatal("nng_http_handler_set_tree", rv);
|
||||
nng_fatal("nng_http_handler_set_tree", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_handler_set_method(handler, NULL)) != 0) {
|
||||
fatal("nng_http_handler_set_method", rv);
|
||||
nng_fatal("nng_http_handler_set_method", rv);
|
||||
}
|
||||
|
||||
// We want to collect the body, and we (arbitrarily) limit this to
|
||||
@ -306,32 +301,32 @@ rest_start(uint16_t port)
|
||||
// chunked transfers.
|
||||
if ((rv = nng_http_handler_collect_body(handler, true, 1024 * 128)) !=
|
||||
0) {
|
||||
fatal("nng_http_handler_collect_body", rv);
|
||||
nng_fatal("nng_http_handler_collect_body", rv);
|
||||
}
|
||||
|
||||
rv = nng_http_handler_alloc_directory(&handler_file, "", "./dist");
|
||||
if (rv != 0) {
|
||||
fatal("nng_http_handler_alloc_file", rv);
|
||||
nng_fatal("nng_http_handler_alloc_file", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_handler_set_method(handler_file, "GET")) != 0) {
|
||||
fatal("nng_http_handler_set_method", rv);
|
||||
nng_fatal("nng_http_handler_set_method", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_handler_collect_body(handler_file, true, 1024)) !=
|
||||
0) {
|
||||
fatal("nng_http_handler_collect_body", rv);
|
||||
nng_fatal("nng_http_handler_collect_body", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_server_add_handler(server, handler_file)) != 0) {
|
||||
fatal("nng_http_handler_add_handler", rv);
|
||||
nng_fatal("nng_http_handler_add_handler", rv);
|
||||
}
|
||||
if ((rv = nng_http_server_add_handler(server, handler)) != 0) {
|
||||
fatal("nng_http_handler_add_handler", rv);
|
||||
nng_fatal("nng_http_handler_add_handler", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_server_start(server)) != 0) {
|
||||
fatal("nng_http_server_start", rv);
|
||||
nng_fatal("nng_http_server_start", rv);
|
||||
}
|
||||
|
||||
nng_url_free(url);
|
||||
@ -348,16 +343,16 @@ inproc_server(void *arg)
|
||||
|
||||
int rv;
|
||||
if ((rv = nng_rep0_open(&sock)) != 0) {
|
||||
fatal("nng_rep0_open", rv);
|
||||
nng_fatal("nng_rep0_open", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_req0_open(&req_sock)) != 0) {
|
||||
fatal("nng_rep0_open", rv);
|
||||
nng_fatal("nng_rep0_open", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_dial(req_sock, INPROC_SERVER_URL, NULL,
|
||||
NNG_FLAG_NONBLOCK)) != 0) {
|
||||
fatal("nng_dial " INPROC_SERVER_URL, rv);
|
||||
nng_fatal("nng_dial " INPROC_SERVER_URL, rv);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < rest_conf->parallel; i++) {
|
||||
@ -367,7 +362,7 @@ inproc_server(void *arg)
|
||||
}
|
||||
|
||||
if ((rv = nng_listen(sock, INPROC_URL, NULL, 0)) != 0) {
|
||||
fatal("nng_listen", rv);
|
||||
nng_fatal("nng_listen", rv);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < rest_conf->parallel; i++) {
|
||||
@ -399,7 +394,7 @@ inproc_cb(void *arg)
|
||||
|
||||
case SRV_RECV:
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
fatal("nng_ctx_recv", rv);
|
||||
nng_fatal("nng_ctx_recv", rv);
|
||||
}
|
||||
|
||||
msg = nng_aio_get_msg(work->aio);
|
||||
@ -421,14 +416,14 @@ inproc_cb(void *arg)
|
||||
case SRV_SEND:
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
nng_msg_free(work->msg);
|
||||
fatal("nng_ctx_send", rv);
|
||||
nng_fatal("nng_ctx_send", rv);
|
||||
}
|
||||
work->state = SRV_RECV;
|
||||
nng_ctx_recv(work->ctx, work->aio);
|
||||
break;
|
||||
|
||||
default:
|
||||
fatal("bad state!", NNG_ESTATE);
|
||||
nng_fatal("bad state!", NNG_ESTATE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -440,13 +435,13 @@ alloc_work(nng_socket sock, conf_http_server *conf)
|
||||
int rv;
|
||||
|
||||
if ((w = nng_alloc(sizeof(*w))) == NULL) {
|
||||
fatal("nng_alloc", NNG_ENOMEM);
|
||||
nng_fatal("nng_alloc", NNG_ENOMEM);
|
||||
}
|
||||
if ((rv = nng_aio_alloc(&w->aio, inproc_cb, w)) != 0) {
|
||||
fatal("nng_aio_alloc", rv);
|
||||
nng_fatal("nng_aio_alloc", rv);
|
||||
}
|
||||
if ((rv = nng_ctx_open(&w->ctx, sock)) != 0) {
|
||||
fatal("nng_ctx_open", rv);
|
||||
nng_fatal("nng_ctx_open", rv);
|
||||
}
|
||||
w->conf = conf;
|
||||
w->state = SRV_INIT;
|
||||
@ -507,7 +502,7 @@ start_rest_server(conf *conf)
|
||||
int rv;
|
||||
rv = nng_thread_create(&inproc_thr, inproc_server, &conf->http_server);
|
||||
if (rv != 0) {
|
||||
fatal("cannot start inproc server", rv);
|
||||
nng_fatal("cannot start inproc server", rv);
|
||||
}
|
||||
|
||||
uint16_t port = conf->http_server.port ? conf->http_server.port
|
||||
|
@ -19,8 +19,9 @@
|
||||
#include "nng/protocol/pipeline0/push.h"
|
||||
#include "nng/supplemental/http/http.h"
|
||||
#include "nng/supplemental/nanolib/conf.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
#include "nng/supplemental/nanolib/log.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
|
||||
#define NANO_LMQ_INIT_CAP 16
|
||||
|
||||
@ -44,13 +45,6 @@ static void webhook_cb(void *arg);
|
||||
|
||||
static nng_thread *inproc_thr;
|
||||
|
||||
static void
|
||||
fatal(uint32_t id, const char *func, int rv)
|
||||
{
|
||||
fprintf(stderr, "[%d] %s: %s\n", id, func, nng_strerror(rv));
|
||||
// exit(1);
|
||||
}
|
||||
|
||||
static void
|
||||
send_msg(conf_web_hook *conf, nng_msg *msg)
|
||||
{
|
||||
@ -175,7 +169,7 @@ webhook_cb(void *arg)
|
||||
|
||||
case HOOK_RECV:
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
fatal(work->id, "nng_recv_aio", rv);
|
||||
nng_fatal("nng_recv_aio", rv);
|
||||
}
|
||||
work->msg = nng_aio_get_msg(work->aio);
|
||||
nng_mtx_lock(work->mtx);
|
||||
@ -183,7 +177,7 @@ webhook_cb(void *arg)
|
||||
size_t lmq_cap = nng_lmq_cap(work->lmq);
|
||||
if ((rv = nng_lmq_resize(
|
||||
work->lmq, lmq_cap + (lmq_cap / 2))) != 0) {
|
||||
fatal(work->id, "nng_lmq_resize", rv);
|
||||
nng_fatal("nng_lmq_resize", rv);
|
||||
}
|
||||
}
|
||||
nng_lmq_put(work->lmq, work->msg);
|
||||
@ -194,7 +188,7 @@ webhook_cb(void *arg)
|
||||
break;
|
||||
|
||||
default:
|
||||
fatal(work->id, "bad state!", NNG_ESTATE);
|
||||
nng_fatal("bad state!", NNG_ESTATE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -206,19 +200,19 @@ alloc_work(nng_socket sock, conf_web_hook *conf)
|
||||
int rv;
|
||||
|
||||
if ((w = nng_alloc(sizeof(*w))) == NULL) {
|
||||
fatal(w->id, "nng_alloc", NNG_ENOMEM);
|
||||
nng_fatal("nng_alloc", NNG_ENOMEM);
|
||||
}
|
||||
if ((rv = nng_aio_alloc(&w->aio, webhook_cb, w)) != 0) {
|
||||
fatal(w->id, "nng_aio_alloc", rv);
|
||||
nng_fatal("nng_aio_alloc", rv);
|
||||
}
|
||||
if ((rv = nng_mtx_alloc(&w->mtx)) != 0) {
|
||||
fatal(w->id, "nng_mtx_alloc", rv);
|
||||
nng_fatal("nng_mtx_alloc", rv);
|
||||
}
|
||||
if ((rv = nng_lmq_alloc(&w->lmq, NANO_LMQ_INIT_CAP) != 0)) {
|
||||
fatal(w->id, "nng_lmq_alloc", rv);
|
||||
nng_fatal("nng_lmq_alloc", rv);
|
||||
}
|
||||
if ((rv = nng_thread_create(&w->thread, thread_cb, w)) != 0) {
|
||||
fatal(w->id, "nng_thread_create", rv);
|
||||
nng_fatal("nng_thread_create", rv);
|
||||
}
|
||||
|
||||
w->conf = conf;
|
||||
@ -232,8 +226,8 @@ alloc_work(nng_socket sock, conf_web_hook *conf)
|
||||
void
|
||||
webhook_thr(void *arg)
|
||||
{
|
||||
conf * conf = arg;
|
||||
nng_socket sock;
|
||||
conf * conf = arg;
|
||||
nng_socket sock;
|
||||
struct hook_work **works =
|
||||
nng_zalloc(conf->web_hook.pool_size * sizeof(struct hook_work *));
|
||||
|
||||
@ -243,7 +237,7 @@ webhook_thr(void *arg)
|
||||
/* Create the socket. */
|
||||
rv = nng_pull0_open(&sock);
|
||||
if (rv != 0) {
|
||||
fatal(0, "nng_rep0_open", rv);
|
||||
nng_fatal("nng_rep0_open", rv);
|
||||
}
|
||||
|
||||
for (i = 0; i < conf->web_hook.pool_size; i++) {
|
||||
@ -252,7 +246,7 @@ webhook_thr(void *arg)
|
||||
}
|
||||
|
||||
if ((rv = nng_listen(sock, WEB_HOOK_INPROC_URL, NULL, 0)) != 0) {
|
||||
fatal(0, "nng_listen", rv);
|
||||
nng_fatal("nng_listen", rv);
|
||||
}
|
||||
|
||||
for (i = 0; i < conf->web_hook.pool_size; i++) {
|
||||
@ -274,7 +268,7 @@ start_webhook_service(conf *conf)
|
||||
{
|
||||
int rv = nng_thread_create(&inproc_thr, webhook_thr, conf);
|
||||
if (rv != 0) {
|
||||
fatal(0, "nng_thread_create", rv);
|
||||
nng_fatal("nng_thread_create", rv);
|
||||
}
|
||||
nng_msleep(500);
|
||||
return rv;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <nng/supplemental/tls/tls.h>
|
||||
#include <nng/supplemental/util/options.h>
|
||||
#include <nng/supplemental/util/platform.h>
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
#include <stdarg.h>
|
||||
|
||||
#ifndef PARALLEL
|
||||
@ -58,22 +59,6 @@ bench_count_init(bench_statistics *bs)
|
||||
nng_atomic_alloc(&bs->index_cnt);
|
||||
}
|
||||
|
||||
static void
|
||||
fatal(const char *msg, ...)
|
||||
{
|
||||
va_list ap;
|
||||
va_start(ap, msg);
|
||||
vfprintf(stderr, msg, ap);
|
||||
va_end(ap);
|
||||
fprintf(stderr, "\n");
|
||||
}
|
||||
|
||||
void
|
||||
nng_fatal(const char *msg, int rv)
|
||||
{
|
||||
fatal("%s: %s\n", msg, nng_strerror(rv));
|
||||
}
|
||||
|
||||
static int
|
||||
init_dialer_tls(nng_dialer d, const char *cacert, const char *cert,
|
||||
const char *key, const char *pass)
|
||||
|
@ -17,10 +17,12 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <nng/mqtt/mqtt_client.h>
|
||||
#include <nng/nng.h>
|
||||
#include <nng/supplemental/util/options.h>
|
||||
#include <nng/supplemental/util/platform.h>
|
||||
#include "nng/mqtt/mqtt_client.h"
|
||||
#include "nng/nng.h"
|
||||
#include "nng/supplemental/util/options.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
|
||||
#if defined(SUPP_CLIENT)
|
||||
|
||||
@ -31,7 +33,6 @@ static int init_dialer_tls(nng_dialer d, const char *cacert, const char *cert,
|
||||
#endif
|
||||
|
||||
static void loadfile(const char *path, void **datap, size_t *lenp);
|
||||
static void fatal(const char *msg, ...);
|
||||
|
||||
#define ASSERT_NULL(p, fmt, ...) \
|
||||
if ((p) != NULL) { \
|
||||
@ -402,18 +403,6 @@ static void create_quic_client(nng_socket *sock, struct work **works,
|
||||
static void average_msgs(client_opts *opts, struct work **works);
|
||||
static void free_opts(void);
|
||||
|
||||
static void
|
||||
fatal(const char *msg, ...)
|
||||
{
|
||||
va_list ap;
|
||||
va_start(ap, msg);
|
||||
vfprintf(stderr, msg, ap);
|
||||
va_end(ap);
|
||||
fprintf(stderr, "\n");
|
||||
fflush(stderr);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
void
|
||||
console(const char *fmt, ...)
|
||||
{
|
||||
@ -424,12 +413,6 @@ console(const char *fmt, ...)
|
||||
fflush(stdout);
|
||||
}
|
||||
|
||||
static void
|
||||
nng_fatal(const char *msg, int rv)
|
||||
{
|
||||
fatal("%s:%s", msg, nng_strerror(rv));
|
||||
}
|
||||
|
||||
static void
|
||||
properties_help(enum client_type type)
|
||||
{
|
||||
|
@ -6,21 +6,12 @@
|
||||
#include <stdarg.h>
|
||||
#include <stdlib.h>
|
||||
#include <errno.h>
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
static int conn_opt_set(int argc, char **argv, nnb_conn_opt *opt);
|
||||
static int sub_opt_set(int argc, char **argv, nnb_sub_opt *opt);
|
||||
static int pub_opt_set(int argc, char **argv, nnb_pub_opt *opt);
|
||||
|
||||
static void
|
||||
fatal(const char *msg, ...)
|
||||
{
|
||||
va_list ap;
|
||||
va_start(ap, msg);
|
||||
vfprintf(stderr, msg, ap);
|
||||
va_end(ap);
|
||||
fprintf(stderr, "\n");
|
||||
}
|
||||
|
||||
static void
|
||||
init_tls(tls_opt *tls)
|
||||
{
|
||||
|
@ -10,22 +10,23 @@
|
||||
#include "nng_proxy.h"
|
||||
#include "client.h"
|
||||
|
||||
#include <nng/mqtt/mqtt_client.h>
|
||||
#include <nng/nng.h>
|
||||
#include <nng/protocol/bus0/bus.h>
|
||||
#include <nng/protocol/pair0/pair.h>
|
||||
#include <nng/protocol/pair1/pair.h>
|
||||
#include <nng/protocol/pipeline0/pull.h>
|
||||
#include <nng/protocol/pipeline0/push.h>
|
||||
#include <nng/protocol/pubsub0/pub.h>
|
||||
#include <nng/protocol/pubsub0/sub.h>
|
||||
#include <nng/protocol/reqrep0/rep.h>
|
||||
#include <nng/protocol/reqrep0/req.h>
|
||||
#include <nng/protocol/survey0/respond.h>
|
||||
#include <nng/protocol/survey0/survey.h>
|
||||
#include <nng/supplemental/tls/tls.h>
|
||||
#include <nng/supplemental/util/options.h>
|
||||
#include <nng/supplemental/util/platform.h>
|
||||
#include "nng/mqtt/mqtt_client.h"
|
||||
#include "nng/nng.h"
|
||||
#include "nng/protocol/bus0/bus.h"
|
||||
#include "nng/protocol/pair0/pair.h"
|
||||
#include "nng/protocol/pair1/pair.h"
|
||||
#include "nng/protocol/pipeline0/pull.h"
|
||||
#include "nng/protocol/pipeline0/push.h"
|
||||
#include "nng/protocol/pubsub0/pub.h"
|
||||
#include "nng/protocol/pubsub0/sub.h"
|
||||
#include "nng/protocol/reqrep0/rep.h"
|
||||
#include "nng/protocol/reqrep0/req.h"
|
||||
#include "nng/protocol/survey0/respond.h"
|
||||
#include "nng/protocol/survey0/survey.h"
|
||||
#include "nng/supplemental/tls/tls.h"
|
||||
#include "nng/supplemental/util/options.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
|
||||
#if defined(SUPP_CLIENT)
|
||||
@ -37,7 +38,6 @@ static int init_dialer_tls(nng_dialer d, const char *cacert, const char *cert,
|
||||
#endif
|
||||
|
||||
static void loadfile(const char *path, void **datap, size_t *lenp);
|
||||
static void fatal(const char *msg, ...);
|
||||
|
||||
#define ASSERT_NULL(p, fmt, ...) \
|
||||
if ((p) != NULL) { \
|
||||
@ -202,22 +202,6 @@ struct work {
|
||||
|
||||
static nng_atomic_bool *exit_signal;
|
||||
|
||||
static void
|
||||
fatal(const char *msg, ...)
|
||||
{
|
||||
va_list ap;
|
||||
va_start(ap, msg);
|
||||
vfprintf(stderr, msg, ap);
|
||||
va_end(ap);
|
||||
fprintf(stderr, "\n");
|
||||
}
|
||||
|
||||
static void
|
||||
nng_fatal(const char *msg, int rv)
|
||||
{
|
||||
fatal("%s:%s", msg, nng_strerror(rv));
|
||||
}
|
||||
|
||||
static void
|
||||
help(enum nng_proto type)
|
||||
{
|
||||
|
@ -17,22 +17,23 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <nng/nng.h>
|
||||
#include <nng/protocol/bus0/bus.h>
|
||||
#include <nng/protocol/pair0/pair.h>
|
||||
#include <nng/protocol/pair1/pair.h>
|
||||
#include <nng/protocol/pipeline0/pull.h>
|
||||
#include <nng/protocol/pipeline0/push.h>
|
||||
#include <nng/protocol/pubsub0/pub.h>
|
||||
#include <nng/protocol/pubsub0/sub.h>
|
||||
#include <nng/protocol/reqrep0/rep.h>
|
||||
#include <nng/protocol/reqrep0/req.h>
|
||||
#include <nng/protocol/survey0/respond.h>
|
||||
#include <nng/protocol/survey0/survey.h>
|
||||
#include <nng/supplemental/tls/tls.h>
|
||||
#include <nng/supplemental/util/options.h>
|
||||
#include <nng/supplemental/util/platform.h>
|
||||
#include <nng/transport/zerotier/zerotier.h>
|
||||
#include "nng/nng.h"
|
||||
#include "nng/protocol/bus0/bus.h"
|
||||
#include "nng/protocol/pair0/pair.h"
|
||||
#include "nng/protocol/pair1/pair.h"
|
||||
#include "nng/protocol/pipeline0/pull.h"
|
||||
#include "nng/protocol/pipeline0/push.h"
|
||||
#include "nng/protocol/pubsub0/pub.h"
|
||||
#include "nng/protocol/pubsub0/sub.h"
|
||||
#include "nng/protocol/reqrep0/rep.h"
|
||||
#include "nng/protocol/reqrep0/req.h"
|
||||
#include "nng/protocol/survey0/respond.h"
|
||||
#include "nng/protocol/survey0/survey.h"
|
||||
#include "nng/supplemental/tls/tls.h"
|
||||
#include "nng/supplemental/util/options.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
#include "nng/transport/zerotier/zerotier.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
// Globals. We need this to avoid passing around everything.
|
||||
int format = 0;
|
||||
@ -213,17 +214,6 @@ static nng_optspec opts[] = {
|
||||
{ .o_name = NULL, .o_val = 0 },
|
||||
};
|
||||
|
||||
static void
|
||||
fatal(const char *msg, ...)
|
||||
{
|
||||
va_list ap;
|
||||
va_start(ap, msg);
|
||||
vfprintf(stderr, msg, ap);
|
||||
va_end(ap);
|
||||
fprintf(stderr, "\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
static void
|
||||
help(void)
|
||||
{
|
||||
|
@ -6,13 +6,7 @@
|
||||
#include "nng/supplemental/nanolib/cJSON.h"
|
||||
#include "nng/supplemental/nanolib/conf.h"
|
||||
#include "nng/supplemental/util/options.h"
|
||||
|
||||
void
|
||||
fatal(int rv)
|
||||
{
|
||||
fprintf(stderr, "%s\n", nng_strerror(rv));
|
||||
exit(1);
|
||||
}
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
|
||||
struct work {
|
||||
@ -146,7 +140,7 @@ send_http(char *method, int id, char *payload)
|
||||
nng_aio_wait(aio);
|
||||
|
||||
if ((rv = nng_aio_result(aio)) != 0) {
|
||||
fatal(rv);
|
||||
nng_fatal("nng_aio_result", rv);
|
||||
}
|
||||
|
||||
if (nng_http_res_get_status(res) != NNG_HTTP_STATUS_OK) {
|
||||
@ -184,7 +178,7 @@ send_http(char *method, int id, char *payload)
|
||||
nng_aio_wait(aio);
|
||||
|
||||
if ((rv = nng_aio_result(aio)) != 0) {
|
||||
fatal(rv);
|
||||
nng_fatal("nng_aio_result", rv);
|
||||
}
|
||||
|
||||
cJSON *jso = cJSON_ParseWithLength(data, len);
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include "nng/supplemental/http/http.h"
|
||||
#include "nng/supplemental/util/platform.h"
|
||||
#include "nng/supplemental/nanolib/conf.h"
|
||||
#include "nng/supplemental/nanolib/utils.h"
|
||||
|
||||
#include "include/rest_api.h"
|
||||
#include "include/web_server.h"
|
||||
@ -13,12 +14,6 @@
|
||||
|
||||
#define INPROC_URL "inproc://cli_rest"
|
||||
|
||||
#define fatal(msg, rv) \
|
||||
{ \
|
||||
fprintf(stderr, "%s:%s\n", msg, nng_strerror(rv)); \
|
||||
exit(1); \
|
||||
}
|
||||
|
||||
typedef enum {
|
||||
SEND_REQ, // Sending REQ request
|
||||
RECV_REP, // Receiving REQ reply
|
||||
@ -177,7 +172,7 @@ rest_job_cb(void *arg)
|
||||
rest_recycle_job(job);
|
||||
return;
|
||||
default:
|
||||
fatal("bad case", NNG_ESTATE);
|
||||
nng_fatal("bad case", NNG_ESTATE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -238,7 +233,7 @@ rest_start(uint16_t port)
|
||||
int rv;
|
||||
|
||||
if ((rv = nng_mtx_alloc(&job_lock)) != 0) {
|
||||
fatal("nng_mtx_alloc", rv);
|
||||
nng_fatal("nng_mtx_alloc", rv);
|
||||
}
|
||||
job_freelist = NULL;
|
||||
|
||||
@ -246,38 +241,38 @@ rest_start(uint16_t port)
|
||||
// from the argument list.
|
||||
snprintf(rest_addr, sizeof(rest_addr), REST_URL, port);
|
||||
if ((rv = nng_url_parse(&url, rest_addr)) != 0) {
|
||||
fatal("nng_url_parse", rv);
|
||||
nng_fatal("nng_url_parse", rv);
|
||||
}
|
||||
|
||||
// Create the REQ socket, and put it in raw mode, connected to
|
||||
// the remote REP server (our inproc server in this case).
|
||||
if ((rv = nng_req0_open(&req_sock)) != 0) {
|
||||
fatal("nng_req0_open", rv);
|
||||
nng_fatal("nng_req0_open", rv);
|
||||
}
|
||||
if ((rv = nng_dial(req_sock, INPROC_URL, NULL, NNG_FLAG_NONBLOCK)) !=
|
||||
0) {
|
||||
fatal("nng_dial(" INPROC_URL ")", rv);
|
||||
nng_fatal("nng_dial(" INPROC_URL ")", rv);
|
||||
}
|
||||
|
||||
// Get a suitable HTTP server instance. This creates one
|
||||
// if it doesn't already exist.
|
||||
if ((rv = nng_http_server_hold(&server, url)) != 0) {
|
||||
fatal("nng_http_server_hold", rv);
|
||||
nng_fatal("nng_http_server_hold", rv);
|
||||
}
|
||||
|
||||
// Allocate the handler - we use a dynamic handler for REST
|
||||
// using the function "rest_handle" declared above.
|
||||
rv = nng_http_handler_alloc(&handler, url->u_path, rest_handle);
|
||||
if (rv != 0) {
|
||||
fatal("nng_http_handler_alloc", rv);
|
||||
nng_fatal("nng_http_handler_alloc", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_handler_set_tree(handler)) != 0) {
|
||||
fatal("nng_http_handler_set_tree", rv);
|
||||
nng_fatal("nng_http_handler_set_tree", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_handler_set_method(handler, NULL)) != 0) {
|
||||
fatal("nng_http_handler_set_method", rv);
|
||||
nng_fatal("nng_http_handler_set_method", rv);
|
||||
}
|
||||
|
||||
// We want to collect the body, and we (arbitrarily) limit this to
|
||||
@ -287,32 +282,32 @@ rest_start(uint16_t port)
|
||||
// chunked transfers.
|
||||
if ((rv = nng_http_handler_collect_body(handler, true, 1024 * 128)) !=
|
||||
0) {
|
||||
fatal("nng_http_handler_collect_body", rv);
|
||||
nng_fatal("nng_http_handler_collect_body", rv);
|
||||
}
|
||||
|
||||
rv = nng_http_handler_alloc_directory(&handler_file, "", "./dist");
|
||||
if (rv != 0) {
|
||||
fatal("nng_http_handler_alloc_file", rv);
|
||||
nng_fatal("nng_http_handler_alloc_file", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_handler_set_method(handler_file, "GET")) != 0) {
|
||||
fatal("nng_http_handler_set_method", rv);
|
||||
nng_fatal("nng_http_handler_set_method", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_handler_collect_body(handler_file, true, 1024)) !=
|
||||
0) {
|
||||
fatal("nng_http_handler_collect_body", rv);
|
||||
nng_fatal("nng_http_handler_collect_body", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_server_add_handler(server, handler_file)) != 0) {
|
||||
fatal("nng_http_handler_add_handler", rv);
|
||||
nng_fatal("nng_http_handler_add_handler", rv);
|
||||
}
|
||||
if ((rv = nng_http_server_add_handler(server, handler)) != 0) {
|
||||
fatal("nng_http_handler_add_handler", rv);
|
||||
nng_fatal("nng_http_handler_add_handler", rv);
|
||||
}
|
||||
|
||||
if ((rv = nng_http_server_start(server)) != 0) {
|
||||
fatal("nng_http_server_start", rv);
|
||||
nng_fatal("nng_http_server_start", rv);
|
||||
}
|
||||
|
||||
nng_url_free(url);
|
||||
@ -328,7 +323,7 @@ inproc_server(void *arg)
|
||||
|
||||
int rv;
|
||||
if ((rv = nng_rep0_open(&sock)) != 0) {
|
||||
fatal("nng_rep0_open", rv);
|
||||
nng_fatal("nng_rep0_open", rv);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < proxy->http_server->parallel; i++) {
|
||||
@ -336,7 +331,7 @@ inproc_server(void *arg)
|
||||
}
|
||||
|
||||
if ((rv = nng_listen(sock, INPROC_URL, NULL, 0)) != 0) {
|
||||
fatal("nng_listen", rv);
|
||||
nng_fatal("nng_listen", rv);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < proxy->http_server->parallel; i++) {
|
||||
@ -369,7 +364,7 @@ inproc_cb(void *arg)
|
||||
|
||||
case SRV_RECV:
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
fatal("nng_ctx_recv", rv);
|
||||
nng_fatal("nng_ctx_recv", rv);
|
||||
}
|
||||
|
||||
msg = nng_aio_get_msg(work->aio);
|
||||
@ -391,14 +386,14 @@ inproc_cb(void *arg)
|
||||
case SRV_SEND:
|
||||
if ((rv = nng_aio_result(work->aio)) != 0) {
|
||||
nng_msg_free(work->msg);
|
||||
fatal("nng_ctx_send", rv);
|
||||
nng_fatal("nng_ctx_send", rv);
|
||||
}
|
||||
work->state = SRV_RECV;
|
||||
nng_ctx_recv(work->ctx, work->aio);
|
||||
break;
|
||||
|
||||
default:
|
||||
fatal("bad state!", NNG_ESTATE);
|
||||
nng_fatal("bad state!", NNG_ESTATE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -410,13 +405,13 @@ alloc_work(nng_socket sock, proxy_info *proxy)
|
||||
int rv;
|
||||
|
||||
if ((w = nng_alloc(sizeof(*w))) == NULL) {
|
||||
fatal("nng_alloc", NNG_ENOMEM);
|
||||
nng_fatal("nng_alloc", NNG_ENOMEM);
|
||||
}
|
||||
if ((rv = nng_aio_alloc(&w->aio, inproc_cb, w)) != 0) {
|
||||
fatal("nng_aio_alloc", rv);
|
||||
nng_fatal("nng_aio_alloc", rv);
|
||||
}
|
||||
if ((rv = nng_ctx_open(&w->ctx, sock)) != 0) {
|
||||
fatal("nng_ctx_open", rv);
|
||||
nng_fatal("nng_ctx_open", rv);
|
||||
}
|
||||
w->proxy = proxy;
|
||||
w->state = SRV_INIT;
|
||||
@ -429,7 +424,7 @@ start_rest_server(proxy_info *proxy)
|
||||
int rv;
|
||||
rv = nng_thread_create(&inproc_thr, inproc_server, proxy);
|
||||
if (rv != 0) {
|
||||
fatal("cannot start inproc server", rv);
|
||||
nng_fatal("cannot start inproc server", rv);
|
||||
}
|
||||
|
||||
rest_start(proxy->http_server->port);
|
||||
|
Reference in New Issue
Block a user