mirror of
https://github.com/nanomq/nanomq.git
synced 2025-06-22 01:57:27 +00:00
1185 lines
27 KiB
C
1185 lines
27 KiB
C
//
|
|
// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
|
|
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
|
|
// Copyright 2020 Lager Data, Inc. <support@lagerdata.com>
|
|
//
|
|
// This software is supplied under the terms of the MIT License, a
|
|
// copy of which should be located in the distribution where this
|
|
// file was obtained (LICENSE.txt). A copy of the license may also be
|
|
// found online at https://opensource.org/licenses/MIT.
|
|
//
|
|
|
|
#include <ctype.h>
|
|
#include <errno.h>
|
|
#include <stdarg.h>
|
|
#include <stdbool.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#include "nng/nng.h"
|
|
#include "nng/protocol/bus0/bus.h"
|
|
#include "nng/protocol/pair0/pair.h"
|
|
#include "nng/protocol/pair1/pair.h"
|
|
#include "nng/protocol/pipeline0/pull.h"
|
|
#include "nng/protocol/pipeline0/push.h"
|
|
#include "nng/protocol/pubsub0/pub.h"
|
|
#include "nng/protocol/pubsub0/sub.h"
|
|
#include "nng/protocol/reqrep0/rep.h"
|
|
#include "nng/protocol/reqrep0/req.h"
|
|
#include "nng/protocol/survey0/respond.h"
|
|
#include "nng/protocol/survey0/survey.h"
|
|
#include "nng/supplemental/tls/tls.h"
|
|
#include "nng/supplemental/util/options.h"
|
|
#include "nng/supplemental/util/platform.h"
|
|
#include "nng/transport/zerotier/zerotier.h"
|
|
#include "nng/supplemental/nanolib/utils.h"
|
|
|
|
// Globals. We need this to avoid passing around everything.
|
|
int format = 0;
|
|
int proto = 0;
|
|
int verbose = 0;
|
|
int delay = 0;
|
|
nng_duration interval = NNG_DURATION_INFINITE;
|
|
nng_duration sendtimeo = NNG_DURATION_INFINITE;
|
|
nng_duration recvtimeo = NNG_DURATION_INFINITE;
|
|
void * data = NULL;
|
|
size_t datalen = 0;
|
|
int compat = 0;
|
|
int async = 0;
|
|
int insecure = 0;
|
|
void * cacert = NULL;
|
|
size_t cacertlen = 0;
|
|
void * keyfile = NULL;
|
|
size_t keylen = 0;
|
|
void * certfile = NULL;
|
|
size_t certlen = 0;
|
|
const char * zthome = NULL;
|
|
int count = 0;
|
|
int recvmaxsz = -1;
|
|
|
|
// Options, must start at 1 because zero is sentinel.
|
|
enum options {
|
|
OPT_HELP = 1,
|
|
OPT_VERBOSE,
|
|
OPT_SILENT,
|
|
OPT_REP0,
|
|
OPT_REQ0,
|
|
OPT_PUSH0,
|
|
OPT_PULL0,
|
|
OPT_PUB0,
|
|
OPT_SUB0,
|
|
OPT_SURVEY0,
|
|
OPT_RESPOND0,
|
|
OPT_PAIR0,
|
|
OPT_PAIR1,
|
|
OPT_PAIR,
|
|
OPT_BUS0,
|
|
OPT_DIAL,
|
|
OPT_LISTEN,
|
|
OPT_DATA,
|
|
OPT_FILE,
|
|
OPT_SUBSCRIBE,
|
|
OPT_INTERVAL,
|
|
OPT_DELAY,
|
|
OPT_COUNT,
|
|
OPT_FORMAT,
|
|
OPT_RAW,
|
|
OPT_ASCII,
|
|
OPT_QUOTED,
|
|
OPT_MSGPACK,
|
|
OPT_HEX,
|
|
OPT_BLANK, // no printing, not an actual option.
|
|
OPT_COMPAT,
|
|
OPT_ASYNC,
|
|
OPT_RCV_TIMEO,
|
|
OPT_SND_TIMEO,
|
|
OPT_SOCK_NAME,
|
|
OPT_LISTEN_IPC,
|
|
OPT_DIAL_IPC,
|
|
OPT_LISTEN_LOCAL,
|
|
OPT_DIAL_LOCAL,
|
|
OPT_INSECURE,
|
|
OPT_CACERT,
|
|
OPT_KEYFILE,
|
|
OPT_CERTFILE,
|
|
OPT_VERSION,
|
|
OPT_RECVMAXSZ,
|
|
OPT_ZTHOME,
|
|
};
|
|
|
|
static nng_optspec opts[] = {
|
|
{ .o_name = "help", .o_short = 'h', .o_val = OPT_HELP },
|
|
{ .o_name = "verbose", .o_short = 'v', .o_val = OPT_VERBOSE },
|
|
{ .o_name = "silent", .o_short = 'q', .o_val = OPT_SILENT },
|
|
{ .o_name = "req0", .o_val = OPT_REQ0 },
|
|
{ .o_name = "rep0", .o_val = OPT_REP0 },
|
|
{ .o_name = "push0", .o_val = OPT_PUSH0 },
|
|
{ .o_name = "pull0", .o_val = OPT_PULL0 },
|
|
{ .o_name = "pub0", .o_val = OPT_PUB0 },
|
|
{ .o_name = "sub", .o_val = OPT_SUB0 }, // explicit for alias
|
|
{ .o_name = "sub0", .o_val = OPT_SUB0 },
|
|
{ .o_name = "surveyor0", .o_val = OPT_SURVEY0 },
|
|
{ .o_name = "respondent0", .o_val = OPT_RESPOND0 },
|
|
{ .o_name = "pair", .o_val = OPT_PAIR },
|
|
{ .o_name = "pair0", .o_val = OPT_PAIR0 },
|
|
{ .o_name = "pair1", .o_val = OPT_PAIR1 },
|
|
{ .o_name = "bus0", .o_val = OPT_BUS0 },
|
|
{ .o_name = "dial", .o_val = OPT_DIAL, .o_arg = true },
|
|
{ .o_name = "listen", .o_val = OPT_LISTEN, .o_arg = true },
|
|
{ .o_name = "data", .o_short = 'D', .o_val = OPT_DATA, .o_arg = true },
|
|
{ .o_name = "file", .o_short = 'F', .o_val = OPT_FILE, .o_arg = true },
|
|
{ .o_name = "subscribe", .o_val = OPT_SUBSCRIBE, .o_arg = true },
|
|
{ .o_name = "format", .o_val = OPT_FORMAT, .o_arg = true },
|
|
{ .o_name = "ascii", .o_short = 'A', .o_val = OPT_ASCII },
|
|
{ .o_name = "quoted", .o_short = 'Q', .o_val = OPT_QUOTED },
|
|
{ .o_name = "raw", .o_val = OPT_RAW },
|
|
{ .o_name = "hex", .o_val = OPT_HEX },
|
|
{ .o_name = "compat", .o_val = OPT_COMPAT },
|
|
{ .o_name = "async", .o_val = OPT_ASYNC },
|
|
{ .o_name = "msgpack", .o_val = OPT_MSGPACK },
|
|
|
|
{
|
|
.o_name = "recv-maxsz",
|
|
.o_short = 'Z',
|
|
.o_val = OPT_RECVMAXSZ,
|
|
.o_arg = true,
|
|
},
|
|
{
|
|
.o_name = "count",
|
|
.o_short = 'C',
|
|
.o_val = OPT_COUNT,
|
|
.o_arg = true,
|
|
},
|
|
{
|
|
.o_name = "delay",
|
|
.o_short = 'd',
|
|
.o_val = OPT_DELAY,
|
|
.o_arg = true,
|
|
},
|
|
{
|
|
.o_name = "interval",
|
|
.o_short = 'i',
|
|
.o_val = OPT_INTERVAL,
|
|
.o_arg = true,
|
|
},
|
|
{ .o_name = "recv-timeout", .o_val = OPT_RCV_TIMEO, .o_arg = true },
|
|
{ .o_name = "send-timeout", .o_val = OPT_SND_TIMEO, .o_arg = true },
|
|
{ .o_name = "socket-name", .o_val = OPT_SOCK_NAME, .o_arg = true },
|
|
|
|
// Legacy compatibility with nanocat.
|
|
{ .o_name = "bind", .o_val = OPT_LISTEN, .o_arg = true },
|
|
{ .o_name = "connect", .o_val = OPT_DIAL, .o_arg = true },
|
|
{
|
|
.o_name = "bind-ipc",
|
|
.o_short = 'X',
|
|
.o_val = OPT_LISTEN_IPC,
|
|
.o_arg = true,
|
|
},
|
|
{
|
|
.o_name = "connect-ipc",
|
|
.o_short = 'x',
|
|
.o_val = OPT_DIAL_IPC,
|
|
.o_arg = true,
|
|
},
|
|
{
|
|
.o_name = "bind-local",
|
|
.o_short = 'L',
|
|
.o_val = OPT_LISTEN_LOCAL,
|
|
.o_arg = true,
|
|
},
|
|
{
|
|
.o_name = "connect-local",
|
|
.o_short = 'l',
|
|
.o_val = OPT_DIAL_LOCAL,
|
|
.o_arg = true,
|
|
},
|
|
{ .o_name = "insecure", .o_short = 'k', .o_val = OPT_INSECURE },
|
|
{ .o_name = "cacert", .o_val = OPT_CACERT, .o_arg = true },
|
|
{ .o_name = "key", .o_val = OPT_KEYFILE, .o_arg = true },
|
|
{
|
|
.o_name = "cert",
|
|
.o_short = 'E',
|
|
.o_val = OPT_CERTFILE,
|
|
.o_arg = true,
|
|
},
|
|
{
|
|
.o_name = "zt-home",
|
|
.o_val = OPT_ZTHOME,
|
|
.o_arg = true,
|
|
},
|
|
{ .o_name = "version", .o_short = 'V', .o_val = OPT_VERSION },
|
|
|
|
// Sentinel.
|
|
{ .o_name = NULL, .o_val = 0 },
|
|
};
|
|
|
|
static void
|
|
help(void)
|
|
{
|
|
printf(
|
|
"Usage: nngcat <proto> <addr>... [<fmt>] [<opts>...] [<src>]\n\n");
|
|
printf("<proto> must be one of:\n");
|
|
printf(" --req0 (or alias --req)\n");
|
|
printf(" --rep0 (or alias --rep)\n");
|
|
printf(" --pub0 (or alias --pub)\n");
|
|
printf(" --sub0 (or alias --sub)\n");
|
|
printf(" --push0 (or alias --push)\n");
|
|
printf(" --pull0 (or alias --pull)\n");
|
|
printf(" --surveyor0 (or alias --surveyor)\n");
|
|
printf(" --respondent0 (or alias --respondent)\n");
|
|
printf(" --pair0\n");
|
|
printf(" --pair1\n");
|
|
printf(" --pair (alias for either pair0 or pair1)\n");
|
|
printf("\n<addr> must be one or more of:\n");
|
|
printf(" --dial <url> (or alias --connect <url>)\n");
|
|
printf(" --listen <url> (or alias --bind <url>)\n");
|
|
printf(" --bind-local <port> (or alias -L <port>)\n");
|
|
printf(" --connect-local <port> (or alias -l <port>)\n");
|
|
printf(" --bind-ipc <path> (or alias -X <path>)\n");
|
|
printf(" --connect-ipc <path> (or alias -x <path>)\n");
|
|
printf("\n<fmt> may be one or none of:\n");
|
|
printf(" --format <no|ascii|hex|msgpack|quoted|raw>\n");
|
|
printf(" --ascii (or alias -A)\n");
|
|
printf(" --quoted (or alias -Q)\n");
|
|
printf(" --hex\n");
|
|
printf(" --msgpack\n");
|
|
printf(" --raw\n");
|
|
printf("\n<opts> may be any of:\n");
|
|
printf(" --subscribe <topic> (only with --sub protocol)\n");
|
|
printf(" --silent (or alias -q)\n");
|
|
printf(" --verbose (or alias -v)\n");
|
|
printf(" --count <num> (or alias -C <num>)\n");
|
|
printf(" --delay <secs> (or alias -d <secs>)\n");
|
|
printf(" --interval <secs> (or alias -i <secs>)\n");
|
|
printf(" --recv-timeout <secs>\n");
|
|
printf(" --send-timeout <secs>\n");
|
|
printf(" --recv-maxsz <size> (or alias -Z <size>)\n");
|
|
printf(" --compat\n");
|
|
printf(" --async\n");
|
|
printf(" --insecure (or alias -k)\n");
|
|
printf(" --cacert <file>\n");
|
|
printf(" --cert <file> (or alias -E <file>)\n");
|
|
printf(" --key <file>\n");
|
|
printf(" --zt-home <path>\n");
|
|
printf("\n<src> may be one of:\n");
|
|
printf(" --file <file> (or alias -F <file>). "
|
|
"Use - for standard input.\n");
|
|
printf(" --data <data> (or alias -D <data>)\n");
|
|
exit(1);
|
|
}
|
|
|
|
static int
|
|
intarg(const char *val, int maxv)
|
|
{
|
|
int v = 0;
|
|
|
|
if (val[0] == '\0') {
|
|
fatal("Empty integer argument.");
|
|
}
|
|
while (*val != '\0') {
|
|
if (!isdigit(*val)) {
|
|
fatal("Integer argument expected.");
|
|
}
|
|
v *= 10;
|
|
v += ((*val) - '0');
|
|
val++;
|
|
if (v > maxv) {
|
|
fatal("Integer argument too large.");
|
|
}
|
|
}
|
|
if (v < 0) {
|
|
fatal("Integer argument overflow.");
|
|
}
|
|
return (v);
|
|
}
|
|
|
|
// This reads a file into memory. Care is taken to ensure that
|
|
// the buffer is one byte larger and contains a terminating
|
|
// NUL. (Useful for key files and such.)
|
|
static void
|
|
loadfile(const char *path, void **datap, size_t *lenp)
|
|
{
|
|
FILE * f;
|
|
size_t total_read = 0;
|
|
size_t allocation_size = BUFSIZ;
|
|
char * fdata;
|
|
char * realloc_result;
|
|
|
|
if (strcmp(path, "-") == 0) {
|
|
f = stdin;
|
|
} else {
|
|
if ((f = fopen(path, "rb")) == NULL) {
|
|
fatal(
|
|
"Cannot open file %s: %s", path, strerror(errno));
|
|
}
|
|
}
|
|
|
|
if ((fdata = malloc(allocation_size + 1)) == NULL) {
|
|
fatal("Out of memory.");
|
|
}
|
|
|
|
while (1) {
|
|
total_read += fread(
|
|
fdata + total_read, 1, allocation_size - total_read, f);
|
|
if (ferror(f)) {
|
|
if (errno == EINTR) {
|
|
continue;
|
|
}
|
|
fatal(
|
|
"Read from %s failed: %s", path, strerror(errno));
|
|
}
|
|
if (feof(f)) {
|
|
break;
|
|
}
|
|
if (total_read == allocation_size) {
|
|
if (allocation_size > SIZE_MAX / 2) {
|
|
fatal("Out of memory.");
|
|
}
|
|
allocation_size *= 2;
|
|
if ((realloc_result = realloc(
|
|
fdata, allocation_size + 1)) == NULL) {
|
|
free(fdata);
|
|
fatal("Out of memory.");
|
|
}
|
|
fdata = realloc_result;
|
|
}
|
|
}
|
|
if (f != stdin) {
|
|
fclose(f);
|
|
}
|
|
fdata[total_read] = '\0';
|
|
*datap = fdata;
|
|
*lenp = total_read;
|
|
}
|
|
|
|
static void
|
|
configtls(nng_tls_config *tls)
|
|
{
|
|
int rv = 0;
|
|
if (insecure) {
|
|
rv = nng_tls_config_auth_mode(tls, NNG_TLS_AUTH_MODE_NONE);
|
|
}
|
|
if ((rv == 0) && (certfile != NULL)) {
|
|
keyfile = keyfile ? keyfile : certfile;
|
|
rv = nng_tls_config_own_cert(tls, certfile, keyfile, NULL);
|
|
}
|
|
if ((rv == 0) && (cacert != NULL)) {
|
|
rv = nng_tls_config_ca_chain(tls, cacert, NULL);
|
|
}
|
|
if (rv != 0) {
|
|
fatal("Unable to configure TLS: %s", nng_strerror(rv));
|
|
}
|
|
}
|
|
|
|
struct addr {
|
|
struct addr *next;
|
|
int mode;
|
|
char * val;
|
|
};
|
|
|
|
struct addr **
|
|
addaddr(struct addr **endp, int mode, const char *a)
|
|
{
|
|
struct addr *na;
|
|
|
|
if (((na = malloc(sizeof(*na))) == NULL) ||
|
|
((na->val = malloc(strlen(a) + 1)) == NULL)) {
|
|
fatal("Out of memory.");
|
|
}
|
|
na->mode = mode;
|
|
memcpy(na->val, a, strlen(a) + 1);
|
|
na->next = NULL;
|
|
*endp = na;
|
|
return (&na->next);
|
|
}
|
|
|
|
struct topic {
|
|
struct topic *next;
|
|
char * val;
|
|
};
|
|
|
|
struct topic **
|
|
nng_addtopic(struct topic **endp, const char *s)
|
|
{
|
|
struct topic *t;
|
|
|
|
if (((t = malloc(sizeof(*t))) == NULL) ||
|
|
((t->val = malloc(strlen(s) + 1)) == NULL)) {
|
|
fatal("Out of memory.");
|
|
}
|
|
memcpy(t->val, s, strlen(s) + 1);
|
|
t->next = NULL;
|
|
*endp = t;
|
|
return (&t->next);
|
|
}
|
|
|
|
void
|
|
printmsg(char *buf, size_t len)
|
|
{
|
|
switch (format) {
|
|
case OPT_BLANK: // Suppress contents.
|
|
return;
|
|
case OPT_RAW: // Just emit raw contents.
|
|
fwrite(buf, 1, len, stdout);
|
|
break;
|
|
case OPT_ASCII: // ASCII, but non-ASCII substituted with '.'
|
|
for (size_t i = 0; i < len; i++) {
|
|
if (isprint(buf[i])) {
|
|
putchar(buf[i]);
|
|
} else {
|
|
putchar('.');
|
|
}
|
|
}
|
|
break;
|
|
case OPT_QUOTED: // C style quoted strings.
|
|
putchar('"');
|
|
for (size_t i = 0; i < len; i++) {
|
|
switch (buf[i]) {
|
|
case '\n':
|
|
putchar('\\');
|
|
putchar('n');
|
|
break;
|
|
case '\r':
|
|
putchar('\\');
|
|
putchar('r');
|
|
break;
|
|
case '\t':
|
|
putchar('\\');
|
|
putchar('t');
|
|
break;
|
|
case '"':
|
|
case '\\':
|
|
putchar('\\');
|
|
putchar(buf[i]);
|
|
break;
|
|
default:
|
|
if (isprint(buf[i])) {
|
|
fputc(buf[i], stdout);
|
|
} else {
|
|
printf("\\x%02x", (uint8_t) buf[i]);
|
|
}
|
|
}
|
|
}
|
|
putchar('"');
|
|
putchar('\n');
|
|
break;
|
|
case OPT_MSGPACK: // MsgPack, we just encode prefix + len, then
|
|
// raw.
|
|
if (len < 256) {
|
|
putchar('\xc4');
|
|
putchar(len & 0xff);
|
|
} else if (len < 65536) {
|
|
putchar('\xc5');
|
|
putchar((len >> 8) & 0xff);
|
|
putchar(len & 0xff);
|
|
} else {
|
|
putchar('\xc6');
|
|
putchar((len >> 24) & 0xff);
|
|
putchar((len >> 16) & 0xff);
|
|
putchar((len >> 8) & 0xff);
|
|
putchar(len & 0xff);
|
|
}
|
|
fwrite(buf, 1, len, stdout);
|
|
break;
|
|
case OPT_HEX: // Hex, quoted C string encoded with hex
|
|
// literals.
|
|
putchar('"');
|
|
for (size_t i = 0; i < len; i++) {
|
|
printf("\\x%02x", (uint8_t) buf[i]);
|
|
}
|
|
putchar('"');
|
|
putchar('\n');
|
|
break;
|
|
}
|
|
fflush(stdout);
|
|
}
|
|
|
|
void
|
|
recvloop(nng_socket sock)
|
|
{
|
|
int iters = 0;
|
|
for (;;) {
|
|
int rv;
|
|
nng_msg *msg;
|
|
|
|
if ((count > 0) && (iters >= count)) {
|
|
break;
|
|
}
|
|
rv = nng_recvmsg(sock, &msg, 0);
|
|
iters++;
|
|
switch (rv) {
|
|
case NNG_ETIMEDOUT:
|
|
case NNG_ESTATE:
|
|
// Either a regular timeout, or we reached the
|
|
// end of an event like a survey completing.
|
|
return;
|
|
case 0:
|
|
printmsg(nng_msg_body(msg), nng_msg_len(msg));
|
|
nng_msg_free(msg);
|
|
continue;
|
|
default:
|
|
fatal("Receive error: %s", nng_strerror(rv));
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
resploop(nng_socket sock)
|
|
{
|
|
int iters = 0;
|
|
for (;;) {
|
|
int rv;
|
|
nng_msg *msg;
|
|
|
|
rv = nng_recvmsg(sock, &msg, 0);
|
|
if (rv != 0) {
|
|
fatal("Receive error: %s", nng_strerror(rv));
|
|
}
|
|
printmsg(nng_msg_body(msg), nng_msg_len(msg));
|
|
nng_msg_clear(msg);
|
|
if ((rv = nng_msg_append(msg, data, datalen)) != 0) {
|
|
fatal("%s", nng_strerror(rv));
|
|
}
|
|
if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
|
|
fatal("Send error: %s", nng_strerror(rv));
|
|
}
|
|
|
|
iters++;
|
|
if ((count > 0) && (iters >= count)) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
nng_msleep(200);
|
|
}
|
|
|
|
void
|
|
sendloop(nng_socket sock)
|
|
{
|
|
int iters = 0;
|
|
|
|
if (data == NULL) {
|
|
fatal("No data to send (specify with --data or --file)");
|
|
}
|
|
if (delay > 0) {
|
|
nng_msleep(delay);
|
|
}
|
|
|
|
for (;;) {
|
|
int rv;
|
|
nng_msg * msg;
|
|
nng_time start;
|
|
nng_time end;
|
|
nng_duration delta;
|
|
|
|
start = nng_clock();
|
|
if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
|
|
((rv = nng_msg_append(msg, data, datalen)) != 0)) {
|
|
fatal("%s", nng_strerror(rv));
|
|
}
|
|
if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
|
|
fatal("Send error: %s", nng_strerror(rv));
|
|
}
|
|
end = nng_clock();
|
|
delta = (nng_duration)(end - start);
|
|
|
|
iters++;
|
|
// By default, we don't loop.
|
|
if (((interval < 0) && (count == 0)) ||
|
|
((count > 0) && (iters >= count))) {
|
|
break;
|
|
}
|
|
|
|
// We sleep, but we account for time spent, so that our
|
|
// interval appears more or less constant. Of course
|
|
// if we took more than the interval here, then we skip
|
|
// the sleep altogether.
|
|
if ((delta >= 0) && (delta < interval)) {
|
|
nng_msleep(interval - delta);
|
|
}
|
|
}
|
|
// Wait a bit to give queues a chance to drain.
|
|
nng_msleep(200);
|
|
}
|
|
|
|
void
|
|
sendrecv(nng_socket sock)
|
|
{
|
|
int iters = 0;
|
|
|
|
if (data == NULL) {
|
|
fatal("No data to send (specify with --data or --file)");
|
|
}
|
|
if (delay > 0) {
|
|
nng_msleep(delay);
|
|
}
|
|
|
|
// We start by sending a message, then we receive iteratively
|
|
// but we set a concrete timeout if interval is set, to ensure
|
|
// that we exit the receive loop, and can continue.
|
|
for (;;) {
|
|
int rv;
|
|
nng_msg * msg;
|
|
nng_time start;
|
|
nng_time end;
|
|
nng_duration delta;
|
|
|
|
start = nng_clock();
|
|
if (((rv = nng_msg_alloc(&msg, 0)) != 0) ||
|
|
((rv = nng_msg_append(msg, data, datalen)) != 0)) {
|
|
fatal("%s", nng_strerror(rv));
|
|
}
|
|
if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
|
|
fatal("Send error: %s", nng_strerror(rv));
|
|
}
|
|
if ((interval < 0) && (count == 0)) {
|
|
// Only one iteration through.
|
|
recvloop(sock);
|
|
break;
|
|
}
|
|
|
|
// We would like to use recvloop, but we need to reset
|
|
// our timeout each time, as the timer counts down
|
|
// towards zero. Furthermore, with survey, we don't
|
|
// want to increment the iteration count.
|
|
|
|
for (;;) {
|
|
delta = (nng_duration)(nng_clock() - start);
|
|
|
|
nng_duration expire = interval - delta;
|
|
if ((recvtimeo >= 0) && (expire > recvtimeo)) {
|
|
expire = recvtimeo;
|
|
}
|
|
rv =
|
|
nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, expire);
|
|
if (rv != 0) {
|
|
fatal("Cannot set recv timeout: %s",
|
|
nng_strerror(rv));
|
|
}
|
|
|
|
rv = nng_recvmsg(sock, &msg, 0);
|
|
switch (rv) {
|
|
case 0:
|
|
printmsg(nng_msg_body(msg), nng_msg_len(msg));
|
|
nng_msg_free(msg);
|
|
continue;
|
|
case NNG_ETIMEDOUT:
|
|
case NNG_ESTATE:
|
|
// We're done receiving
|
|
break;
|
|
default:
|
|
fatal("Cannot receive: %s", nng_strerror(rv));
|
|
break;
|
|
}
|
|
|
|
// We're done receiving, break out.
|
|
break;
|
|
}
|
|
|
|
end = nng_clock();
|
|
delta = (nng_duration)(end - start);
|
|
|
|
iters++;
|
|
if ((count > 0) && (iters >= count)) {
|
|
break;
|
|
}
|
|
|
|
// We sleep, but we account for time spent, so that our
|
|
// interval appears more or less constant. Of course
|
|
// if we took more than the interval here, then we skip
|
|
// the sleep altogether.
|
|
if ((delta >= 0) && (delta < interval)) {
|
|
nng_msleep(interval - delta);
|
|
}
|
|
}
|
|
}
|
|
|
|
int
|
|
nng_cat_dflt(int ac, char **av)
|
|
{
|
|
int idx = 1;
|
|
char * arg;
|
|
int val;
|
|
int rv;
|
|
char scratch[512];
|
|
struct addr * addrs = NULL;
|
|
struct addr ** addrend;
|
|
struct topic * topics = NULL;
|
|
struct topic **topicend;
|
|
nng_socket sock;
|
|
int port;
|
|
|
|
idx = 1;
|
|
addrend = &addrs;
|
|
topicend = &topics;
|
|
|
|
while ((rv = nng_opts_parse(ac - 1, av + 1, opts, &val, &arg, &idx)) ==
|
|
0) {
|
|
switch (val) {
|
|
case OPT_HELP:
|
|
help();
|
|
break;
|
|
case OPT_REQ0:
|
|
case OPT_REP0:
|
|
case OPT_SUB0:
|
|
case OPT_PUB0:
|
|
case OPT_BUS0:
|
|
case OPT_SURVEY0:
|
|
case OPT_RESPOND0:
|
|
case OPT_PAIR0:
|
|
case OPT_PAIR1:
|
|
case OPT_PUSH0:
|
|
case OPT_PULL0:
|
|
if (proto != 0) {
|
|
fatal("Only one protocol may be "
|
|
"specified.");
|
|
}
|
|
proto = val;
|
|
break;
|
|
case OPT_DIAL:
|
|
case OPT_LISTEN:
|
|
addrend = addaddr(addrend, val, arg);
|
|
break;
|
|
case OPT_DIAL_LOCAL:
|
|
case OPT_LISTEN_LOCAL:
|
|
port = intarg(arg, 65536);
|
|
snprintf(scratch, sizeof(scratch),
|
|
"tcp://127.0.0.1:%d", port);
|
|
addrend = addaddr(addrend, val, scratch);
|
|
break;
|
|
case OPT_DIAL_IPC:
|
|
case OPT_LISTEN_IPC:
|
|
snprintf(scratch, sizeof(scratch), "ipc:///%s", arg);
|
|
addrend = addaddr(addrend, val, scratch);
|
|
break;
|
|
case OPT_COUNT:
|
|
count = intarg(arg, 0x7fffffff);
|
|
break;
|
|
case OPT_SUBSCRIBE:
|
|
topicend = nng_addtopic(topicend, arg);
|
|
break;
|
|
case OPT_VERBOSE:
|
|
case OPT_SILENT:
|
|
verbose = val;
|
|
break;
|
|
case OPT_DELAY:
|
|
delay = intarg(arg, 86400) * 1000; // max 1 day
|
|
break;
|
|
case OPT_INTERVAL:
|
|
interval = intarg(arg, 86400) * 1000; // max 1 day
|
|
break;
|
|
case OPT_SND_TIMEO:
|
|
sendtimeo = intarg(arg, 86400) * 1000; // max 1 day
|
|
break;
|
|
case OPT_RCV_TIMEO:
|
|
recvtimeo = intarg(arg, 86400) * 1000; // max 1 day
|
|
break;
|
|
case OPT_RECVMAXSZ:
|
|
recvmaxsz = intarg(arg, 0x7fffffff);
|
|
if (recvmaxsz == 0) {
|
|
recvmaxsz = 0x7fffffff;
|
|
}
|
|
break;
|
|
case OPT_COMPAT:
|
|
compat = 1;
|
|
break;
|
|
case OPT_ASYNC:
|
|
async = NNG_FLAG_NONBLOCK;
|
|
break;
|
|
case OPT_ASCII:
|
|
case OPT_RAW:
|
|
case OPT_QUOTED:
|
|
case OPT_MSGPACK:
|
|
case OPT_HEX:
|
|
if (format != 0) {
|
|
fatal("Format may be specified only "
|
|
"once.");
|
|
}
|
|
format = val;
|
|
break;
|
|
case OPT_FORMAT:
|
|
if (format != 0) {
|
|
fatal("Format may be specified only "
|
|
"once.");
|
|
}
|
|
if (strcmp(arg, "no") == 0) {
|
|
format = OPT_BLANK;
|
|
} else if (strcmp(arg, "ascii") == 0) {
|
|
format = OPT_ASCII;
|
|
} else if (strcmp(arg, "hex") == 0) {
|
|
format = OPT_HEX;
|
|
} else if (strcmp(arg, "quoted") == 0) {
|
|
format = OPT_QUOTED;
|
|
} else if (strcmp(arg, "raw") == 0) {
|
|
format = OPT_RAW;
|
|
} else if (strcmp(arg, "msgpack") == 0) {
|
|
format = OPT_MSGPACK;
|
|
} else {
|
|
fatal("Invalid format specified.");
|
|
}
|
|
break;
|
|
case OPT_FILE:
|
|
if (data != NULL) {
|
|
fatal("Data (--file, --data) may be "
|
|
"specified "
|
|
"only once.");
|
|
}
|
|
loadfile(arg, &data, &datalen);
|
|
break;
|
|
case OPT_DATA:
|
|
if (data != NULL) {
|
|
fatal("Data (--file, --data) may be "
|
|
"specified "
|
|
"only once.");
|
|
}
|
|
if ((data = malloc(strlen(arg) + 1)) == NULL) {
|
|
fatal("Out of memory.");
|
|
}
|
|
memcpy(data, arg, strlen(arg) + 1);
|
|
datalen = strlen(arg);
|
|
break;
|
|
case OPT_CACERT:
|
|
if (cacert != NULL) {
|
|
fatal("CA Certificate (--cacert) may be "
|
|
"specified only once.");
|
|
}
|
|
loadfile(arg, &cacert, &cacertlen);
|
|
break;
|
|
case OPT_KEYFILE:
|
|
if (keyfile != NULL) {
|
|
fatal(
|
|
"Key (--key) may be specified only once.");
|
|
}
|
|
loadfile(arg, &keyfile, &keylen);
|
|
break;
|
|
case OPT_CERTFILE:
|
|
if (certfile != NULL) {
|
|
fatal("Cert (--cert) may be specified "
|
|
"only "
|
|
"once.");
|
|
}
|
|
loadfile(arg, &certfile, &certlen);
|
|
break;
|
|
case OPT_ZTHOME:
|
|
zthome = arg;
|
|
break;
|
|
case OPT_INSECURE:
|
|
insecure = 1;
|
|
break;
|
|
case OPT_VERSION:
|
|
printf("%s\n", nng_version());
|
|
exit(0);
|
|
}
|
|
}
|
|
switch (rv) {
|
|
case NNG_EINVAL:
|
|
fatal("Option %s is invalid.", av[idx]);
|
|
break;
|
|
case NNG_EAMBIGUOUS:
|
|
fatal("Option %s is ambiguous (specify in full).", av[idx]);
|
|
break;
|
|
case NNG_ENOARG:
|
|
fatal("Option %s requires argument.", av[idx]);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
if (addrs == NULL) {
|
|
fatal("No address specified.");
|
|
}
|
|
|
|
if (compat) {
|
|
if (async != 0) {
|
|
fatal("Options --async and --compat are "
|
|
"incompatible.");
|
|
}
|
|
if (count != 0) {
|
|
fatal("Options --count and --compat are "
|
|
"incompatible.");
|
|
}
|
|
if (proto == OPT_PAIR) {
|
|
proto = OPT_PAIR0;
|
|
}
|
|
if (proto == OPT_PAIR1) {
|
|
fatal("Protocol does not support --compat.");
|
|
}
|
|
async = NNG_FLAG_NONBLOCK;
|
|
} else {
|
|
if (proto == OPT_PAIR) {
|
|
proto = OPT_PAIR1;
|
|
}
|
|
}
|
|
if (proto == OPT_SUB0) {
|
|
if (topics == NULL) {
|
|
(void) nng_addtopic(topicend, ""); // subscribe to all
|
|
}
|
|
} else {
|
|
if (topics != NULL) {
|
|
fatal("Protocol does not support --subscribe.");
|
|
}
|
|
}
|
|
|
|
switch (proto) {
|
|
case OPT_PULL0:
|
|
case OPT_SUB0:
|
|
if (data != NULL) {
|
|
fatal("Protocol does not support --file or "
|
|
"--data.");
|
|
}
|
|
if (interval >= 0) {
|
|
fatal("Protocol does not support --interval.");
|
|
}
|
|
break;
|
|
case OPT_PUSH0:
|
|
case OPT_PUB0:
|
|
if (format != 0) {
|
|
fatal("Protocol does not support --format "
|
|
"options.");
|
|
}
|
|
if (data == NULL) {
|
|
fatal("Protocol requires either --file or "
|
|
"--data.");
|
|
}
|
|
break;
|
|
case OPT_SURVEY0:
|
|
case OPT_REQ0:
|
|
if (data == NULL) {
|
|
fatal("Protocol requires either --file or "
|
|
"--data.");
|
|
}
|
|
break;
|
|
case OPT_REP0:
|
|
case OPT_RESPOND0:
|
|
if (interval >= 0) {
|
|
fatal("Protocol does not support --interval.");
|
|
}
|
|
break;
|
|
case OPT_PAIR0:
|
|
case OPT_PAIR1:
|
|
case OPT_BUS0:
|
|
if ((interval >= 0) && (data == NULL)) {
|
|
fatal("Option --interval requires either "
|
|
"--file or --data.");
|
|
}
|
|
break;
|
|
default:
|
|
// Will be caught in next switch statement.
|
|
break;
|
|
}
|
|
|
|
switch (proto) {
|
|
case OPT_REQ0:
|
|
rv = nng_req0_open(&sock);
|
|
break;
|
|
case OPT_REP0:
|
|
rv = nng_rep0_open(&sock);
|
|
break;
|
|
case OPT_SUB0:
|
|
rv = nng_sub0_open(&sock);
|
|
break;
|
|
case OPT_PUB0:
|
|
rv = nng_pub0_open(&sock);
|
|
break;
|
|
case OPT_PAIR0:
|
|
rv = nng_pair0_open(&sock);
|
|
break;
|
|
case OPT_PAIR1:
|
|
rv = nng_pair1_open(&sock);
|
|
break;
|
|
case OPT_BUS0:
|
|
#ifdef NNG_HAVE_BUS0
|
|
rv = nng_bus0_open(&sock);
|
|
#else
|
|
fatal("Protocol not supported.");
|
|
#endif
|
|
break;
|
|
case OPT_PUSH0:
|
|
#ifdef NNG_HAVE_PUSH0
|
|
rv = nng_push0_open(&sock);
|
|
#else
|
|
fatal("Protocol not supported.");
|
|
#endif
|
|
break;
|
|
case OPT_PULL0:
|
|
#ifdef NNG_HAVE_PULL0
|
|
rv = nng_pull0_open(&sock);
|
|
#else
|
|
fatal("Protocol not supported.");
|
|
#endif
|
|
break;
|
|
case OPT_SURVEY0:
|
|
#ifdef NNG_HAVE_SURVEYOR0
|
|
rv = nng_surveyor0_open(&sock);
|
|
#else
|
|
fatal("Protocol not supported.");
|
|
#endif
|
|
break;
|
|
case OPT_RESPOND0:
|
|
#ifdef NNG_HAVE_RESPONDENT0
|
|
rv = nng_respondent0_open(&sock);
|
|
#else
|
|
fatal("Protocol not supported");
|
|
#endif
|
|
break;
|
|
case 0:
|
|
default:
|
|
fatal("No protocol specified.");
|
|
break;
|
|
}
|
|
if (rv != 0) {
|
|
fatal("Unable to open socket: %s", nng_strerror(rv));
|
|
}
|
|
|
|
for (struct topic *t = topics; t != NULL; t = t->next) {
|
|
rv = nng_socket_set(
|
|
sock, NNG_OPT_SUB_SUBSCRIBE, t->val, strlen(t->val));
|
|
if (rv != 0) {
|
|
fatal("Unable to subscribe to topic %s: %s", t->val,
|
|
nng_strerror(rv));
|
|
}
|
|
}
|
|
|
|
if ((sendtimeo > 0) &&
|
|
((rv = nng_socket_set_ms(sock, NNG_OPT_SENDTIMEO, sendtimeo)) !=
|
|
0)) {
|
|
fatal("Unable to set send timeout: %s", nng_strerror(rv));
|
|
}
|
|
if ((recvtimeo > 0) &&
|
|
((rv = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, recvtimeo)) !=
|
|
0)) {
|
|
fatal("Unable to set send timeout: %s", nng_strerror(rv));
|
|
}
|
|
|
|
if ((recvmaxsz >= 0) &&
|
|
((rv = nng_socket_set_size(sock, NNG_OPT_RECVMAXSZ, recvmaxsz)) !=
|
|
0)) {
|
|
fatal("Unable to set max receive size: %s", nng_strerror(rv));
|
|
}
|
|
|
|
for (struct addr *a = addrs; a != NULL; a = a->next) {
|
|
char * act;
|
|
nng_listener l;
|
|
nng_dialer d;
|
|
nng_tls_config *tls;
|
|
switch (a->mode) {
|
|
case OPT_DIAL:
|
|
case OPT_DIAL_IPC:
|
|
case OPT_DIAL_LOCAL:
|
|
rv = nng_dialer_create(&d, sock, a->val);
|
|
if (rv != 0) {
|
|
fatal("Unable to create dialer for %s: %s",
|
|
a->val, nng_strerror(rv));
|
|
}
|
|
rv = nng_dialer_get_ptr(
|
|
d, NNG_OPT_TLS_CONFIG, (void **) &tls);
|
|
if (rv == 0) {
|
|
configtls(tls);
|
|
} else if (rv != NNG_ENOTSUP) {
|
|
fatal("Unable to get TLS config: %s",
|
|
nng_strerror(rv));
|
|
}
|
|
if (zthome != NULL) {
|
|
rv = nng_dialer_set(d, NNG_OPT_ZT_HOME,
|
|
zthome, strlen(zthome) + 1);
|
|
if ((rv != 0) && (rv != NNG_ENOTSUP)) {
|
|
fatal("Unable to set ZT home: %s",
|
|
nng_strerror(rv));
|
|
}
|
|
}
|
|
rv = nng_dialer_start(d, async);
|
|
act = "dial";
|
|
if ((rv == 0) && (verbose == OPT_VERBOSE)) {
|
|
char ustr[256];
|
|
size_t sz;
|
|
sz = sizeof(ustr);
|
|
if (nng_dialer_get(
|
|
d, NNG_OPT_URL, ustr, &sz) == 0) {
|
|
printf("Connected to: %s\n", ustr);
|
|
}
|
|
}
|
|
break;
|
|
case OPT_LISTEN:
|
|
case OPT_LISTEN_IPC:
|
|
case OPT_LISTEN_LOCAL:
|
|
rv = nng_listener_create(&l, sock, a->val);
|
|
if (rv != 0) {
|
|
fatal("Unable to create listener for %s: %s",
|
|
a->val, nng_strerror(rv));
|
|
}
|
|
rv = nng_listener_get_ptr(
|
|
l, NNG_OPT_TLS_CONFIG, (void **) &tls);
|
|
if (rv == 0) {
|
|
configtls(tls);
|
|
} else if (rv != NNG_ENOTSUP) {
|
|
fatal("Unable to get TLS config: %s",
|
|
nng_strerror(rv));
|
|
}
|
|
if (zthome != NULL) {
|
|
rv = nng_listener_set(l, NNG_OPT_ZT_HOME,
|
|
zthome, strlen(zthome) + 1);
|
|
if ((rv != 0) && (rv != NNG_ENOTSUP)) {
|
|
fatal("Unable to set ZT home: %s",
|
|
nng_strerror(rv));
|
|
}
|
|
}
|
|
rv = nng_listener_start(l, async);
|
|
act = "listen";
|
|
if ((rv == 0) && (verbose == OPT_VERBOSE)) {
|
|
char ustr[256];
|
|
size_t sz;
|
|
sz = sizeof(ustr);
|
|
if (nng_listener_get(
|
|
l, NNG_OPT_URL, ustr, &sz) == 0) {
|
|
printf("Listening at: %s\n", ustr);
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
fatal("Invalid address mode! (Bug!)");
|
|
}
|
|
|
|
if (rv != 0) {
|
|
fatal("Unable to %s on %s: %s", act, a->val,
|
|
nng_strerror(rv));
|
|
}
|
|
}
|
|
|
|
switch (proto) {
|
|
case OPT_SUB0:
|
|
case OPT_PULL0:
|
|
recvloop(sock);
|
|
break;
|
|
case OPT_REP0:
|
|
case OPT_RESPOND0:
|
|
if (data == NULL) {
|
|
recvloop(sock);
|
|
} else {
|
|
resploop(sock);
|
|
}
|
|
break;
|
|
case OPT_PUSH0:
|
|
case OPT_PUB0:
|
|
sendloop(sock);
|
|
break;
|
|
case OPT_BUS0:
|
|
case OPT_PAIR0:
|
|
case OPT_PAIR1:
|
|
if (data != NULL) {
|
|
sendrecv(sock);
|
|
} else {
|
|
recvloop(sock);
|
|
}
|
|
break;
|
|
case OPT_REQ0:
|
|
case OPT_SURVEY0:
|
|
sendrecv(sock);
|
|
break;
|
|
default:
|
|
fatal("Protocol handling unimplemented.");
|
|
}
|
|
|
|
exit(0);
|
|
}
|