support timescaledb

为了支持同时写入pg 和 timescaledb, 在 work 结构体里面建立了不同的数据库
连接,理论上其实可以复用 pg 的数据库连接, 为了简单实现,大部分代码和pg都
是重复的。

为了使用时序库,把 timestamp 字段从 int 改成 TIMESTAMPTZ,
同时担心idx字段 int 不够用,改成了 bigserial 字段, 并且由于 timescaledb
的限制,idx 就不能是主键了。

由于compose_sql_clause 是拼接的sql字符串,导致没有使用 prepare 语句提升写入性能。
感觉 compose_sql_clause 过多的使用 memset(tmp, 0, 800) 导致一次 sql 写入
频繁的调用 memset。

Signed-off-by: huchangqi <huchangqiqi@gmail.com>
This commit is contained in:
huchangqiqi 2024-08-29 15:47:30 +08:00 committed by Jaylin
parent f0eec03fa4
commit 803319759c
8 changed files with 414 additions and 12 deletions

View File

@ -33,6 +33,7 @@ option (ENABLE_JWT "Enable jwt library" OFF)
option (ENABLE_RULE_ENGINE "Enable rule engine" OFF)
option (ENABLE_MYSQL "Enable MYSQL" OFF)
option (ENABLE_POSTGRESQL "Enable POSTGRESQL" OFF)
option (ENABLE_TIMESCALEDB "Enable TIMESCALEDB" OFF)
option (ENABLE_AWS_BRIDGE "Enable aws bridge" OFF)
option (ENABLE_SYSLOG "Enable syslog" ON)
option (ENABLE_PARQUET "Enable parquet" OFF)
@ -368,6 +369,9 @@ if (ENABLE_POSTGRESQL)
add_definitions(-DSUPP_POSTGRESQL)
endif (ENABLE_POSTGRESQL)
if (ENABLE_TIMESCALEDB)
add_definitions(-DSUPP_TIMESCALEDB)
endif (ENABLE_TIMESCALEDB)
add_subdirectory(nng)

View File

@ -98,6 +98,18 @@ if(ENABLE_POSTGRESQL)
endif(ENABLE_POSTGRESQL)
if(ENABLE_TIMESCALEDB)
find_package(PostgreSQL REQUIRED)
# Link to the PostgreSQL C library
target_link_libraries(nanomq ${PostgreSQL_LIBRARIES})
# Include the PostgreSQL C library headers
target_include_directories(nanomq PUBLIC ${PostgreSQL_INCLUDE_DIRS})
endif(ENABLE_TIMESCALEDB)
if(ENABLE_AWS_BRIDGE)
target_link_libraries(nanomq
aws_iot_mqtt

View File

@ -949,6 +949,11 @@ proto_work_init(nng_socket sock, nng_socket extrasock, uint8_t proto,
w->pgconn = NULL;
#endif
#if defined(SUPP_TIMESCALEDB)
w->tsconn = NULL;
#endif
#if defined(NNG_SUPP_SQLITE)
nng_socket_get_ptr(sock, NMQ_OPT_MQTT_QOS_DB, &w->sqlite_db);
#endif
@ -1042,6 +1047,12 @@ broker(conf *nanomq_conf)
}
#endif
#if defined(SUPP_TIMESCALEDB)
if (cr->option & RULE_ENG_TDB) {
nanomq_client_timescaledb(cr, false);
}
#endif
#if defined(FDB_SUPPORT)
if (cr->option & RULE_ENG_FDB) {
pthread_t netThread;

View File

@ -62,6 +62,9 @@ struct work {
#if defined(SUPP_POSTGRESQL)
void *pgconn;
#endif
#if defined(SUPP_TIMESCALEDB)
void *tsconn;
#endif
#if defined(SUPP_PLUGIN)
property *user_property;

View File

@ -15,6 +15,7 @@ extern int nano_client_publish(nng_socket *sock, const char *topic,
extern int nanomq_client_sqlite(conf_rule *cr, bool init_last);
extern int nanomq_client_mysql(conf_rule *cr, bool init_last);
extern int nanomq_client_postgresql(conf_rule *cr, bool init_last);
extern int nanomq_client_timescaledb(conf_rule *cr, bool init_last);
#endif
#endif // NANOMQ_RULE_H

View File

@ -24,6 +24,18 @@ static char *key_arr[] = {
"Payload",
};
#if defined(SUPP_TIMESCALEDB)
static char *ts_type_arr[] = {
" INT",
" INT",
" TEXT",
" TEXT",
" TEXT",
" TEXT",
" TIMESTAMPTZ NOT NULL",
" TEXT",
};
#endif
static char *type_arr[] = {
" INT",
@ -194,12 +206,106 @@ nanomq_client_sqlite(conf_rule *cr, bool init_last)
#endif
#if defined(SUPP_POSTGRESQL)
#if defined(SUPP_POSTGRESQL) || defined(SUPP_TIMESCALEDB)
#include <libpq-fe.h>
#endif
#if defined(SUPP_TIMESCALEDB)
static int ts_finish_with_error(PGconn *conn, rule *rules, int index)
{
log_error("timescaledb %s", PQerrorMessage(conn));
PQfinish(conn);
rule *r = &rules[index];
rule_timescaledb_free(r->timescaledb);
rule_free(r);
cvector_erase(rules, index);
return -1;
}
int
nanomq_client_timescaledb(conf_rule *cr, bool init_last)
{
int rc = 0;
char timescaledb_table[1024];
for (int i = 0; i < cvector_size(cr->rules); i++) {
if (init_last && i != cvector_size(cr->rules) - 1) {
continue;
}
if (RULE_FORWORD_TIMESCALEDB == cr->rules[i].forword_type) {
int index = 0;
char table[256] = { 0 };
snprintf(table, 128,
"CREATE TABLE IF NOT EXISTS %s("
"idx BIGSERIAL",
cr->rules[i].timescaledb->table);
char *err_msg = NULL;
bool first = true;
for (; index < 8; index++) {
if (!cr->rules[i].flag[index])
continue;
strcat(table, ", ");
strcat(table,
cr->rules[i].as[index]
? cr->rules[i].as[index]
: key_arr[index]);
strcat(table, ts_type_arr[index]);
}
strcat(table, ");");
rule_timescaledb *timescaledb = cr->rules[i].timescaledb;
char conninfo[256] = { 0 };
snprintf(conninfo , 128, "dbname=postgres user=%s password=%s host=%s port=5432", timescaledb->username, timescaledb->password, timescaledb->host);
PGconn *conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
rc = ts_finish_with_error(conn, cr->rules, i--);
continue;
}
timescaledb->conn = conn;
PGresult *res = PQexec(conn, table);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
rc = ts_finish_with_error(conn, cr->rules, i--);
continue;
}
PQclear(res);
char hypertable[256] = {0};
snprintf(hypertable, 128,
"SELECT create_hypertable('%s', 'timestamp', if_not_exists => TRUE);",
cr->rules[i].timescaledb->table);
res = PQexec(conn, hypertable);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
rc = ts_finish_with_error(conn, cr->rules, i--);
continue;
}
PQclear(res);
}
}
return rc;
}
#endif
#if defined(SUPP_POSTGRESQL)
static int pg_finish_with_error(PGconn *conn, rule *rules, int index)
{
log_error("%s", PQerrorMessage(conn));
log_error("Postgresql %s", PQerrorMessage(conn));
PQfinish(conn);
rule *r = &rules[index];
rule_postgresql_free(r->postgresql);
@ -226,6 +332,7 @@ nanomq_client_postgresql(conf_rule *cr, bool init_last)
"CREATE TABLE IF NOT EXISTS %s("
"idx SERIAL PRIMARY KEY",
cr->rules[i].postgresql->table);
char *err_msg = NULL;
bool first = true;

View File

@ -14,7 +14,7 @@
#include <mysql.h>
#endif
#if defined(SUPP_POSTGRESQL)
#if defined(SUPP_POSTGRESQL) || defined (SUPP_TIMESCALEDB)
#include <libpq-fe.h>
#endif
@ -715,20 +715,31 @@ compose_sql_clause(rule *info, char *key, char *value, bool is_need_set, int j,
} else {
strcat(key, "Username");
}
memset(tmp, 0, 800);
sprintf(tmp, "%s\'%s\'", value, username);
strcpy(value, tmp);
if (username == NULL) {
strcat(value, "NULL");
} else {
memset(tmp, 0, 800);
sprintf(tmp, "%s\'%s\'", value, username);
strcpy(value, tmp);
}
break;
case RULE_PASSWORD:;
char *password = (char *) conn_param_get_password(cp);
if (info->as[j]) {
strcat(key, info->as[j]);
} else {
strcat(key, "Password");
}
memset(tmp, 0, 800);
sprintf(tmp, "%s\'%s\'", value, password);
strcpy(value, tmp);
if (password == NULL) {
strcat(value, "NULL");
} else {
memset(tmp, 0, 800);
sprintf(tmp, "%s\'%s\'", value, password);
strcpy(value, tmp);
}
break;
case RULE_TIMESTAMP:
if (info->as[j]) {
@ -738,7 +749,11 @@ compose_sql_clause(rule *info, char *key, char *value, bool is_need_set, int j,
}
memset(tmp, 0, 800);
sprintf(tmp, "%s%lu", value, (unsigned long) time(NULL));
if (RULE_FORWORD_TIMESCALEDB == info->forword_type) {
sprintf(tmp, "%sto_timestamp(%lu)", value, (unsigned long) time(NULL));
} else {
sprintf(tmp, "%s%lu", value, (unsigned long) time(NULL));
}
strcpy(value, tmp);
break;
case RULE_PAYLOAD_ALL:;
@ -774,6 +789,8 @@ compose_sql_clause(rule *info, char *key, char *value, bool is_need_set, int j,
snprintf(tmp_key, 128, "ALTER TABLE %s ADD %s INT;\n", info->mysql->table, info->payload[pi]->pas);
} else if (RULE_FORWORD_POSTGRESQL == info->forword_type) {
snprintf(tmp_key, 128, "ALTER TABLE %s ADD %s INT;\n", info->postgresql->table, info->payload[pi]->pas);
} else if (RULE_FORWORD_TIMESCALEDB == info->forword_type) {
snprintf(tmp_key, 128, "ALTER TABLE %s ADD %s INT;\n", info->timescaledb->table, info->payload[pi]->pas);
}
}
strcat(key, info->payload[pi]->pas);
@ -797,6 +814,8 @@ compose_sql_clause(rule *info, char *key, char *value, bool is_need_set, int j,
snprintf(tmp_key, 128, "ALTER TABLE %s ADD %s TEXT;\n", info->mysql->table, info->payload[pi]->pas);
} else if (RULE_FORWORD_POSTGRESQL == info->forword_type) {
snprintf(tmp_key, 128, "ALTER TABLE %s ADD %s TEXT;\n", info->postgresql->table, info->payload[pi]->pas);
} else if (RULE_FORWORD_TIMESCALEDB == info->forword_type) {
snprintf(tmp_key, 128, "ALTER TABLE %s ADD %s TEXT;\n", info->timescaledb->table, info->payload[pi]->pas);
}
}
strcat(key, info->payload[pi]->pas);
@ -821,7 +840,9 @@ compose_sql_clause(rule *info, char *key, char *value, bool is_need_set, int j,
snprintf(tmp_key, 128, "ALTER TABLE %s ADD %s TEXT;\n", info->mysql->table, info->payload[pi]->pas);
} else if (RULE_FORWORD_POSTGRESQL == info->forword_type) {
snprintf(tmp_key, 128, "ALTER TABLE %s ADD %s TEXT;\n", info->postgresql->table, info->payload[pi]->pas);
}
} else if (RULE_FORWORD_TIMESCALEDB == info->forword_type) {
snprintf(tmp_key, 128, "ALTER TABLE %s ADD %s TEXT;\n", info->timescaledb->table, info->payload[pi]->pas);
}
}
strcat(key, info->payload[pi]->pas);
strcat(key, ", ");
@ -881,6 +902,8 @@ rule_engine_insert_sql(nano_work *work)
bool is_need_set_mysql = false;
static bool is_first_time_postgresql = true;
bool is_need_set_postgresql = false;
static bool is_first_time_timescaledb = true;
bool is_need_set_timescaledb = false;
nng_mtx *rule_mutex = work->config->rule_eng.rule_mutex;
@ -1203,6 +1226,107 @@ rule_engine_insert_sql(nano_work *work)
}
#endif
#if defined(SUPP_TIMESCALEDB)
if (RULE_ENG_TDB & work->config->rule_eng.option && RULE_FORWORD_TIMESCALEDB == rules[i].forword_type) {
if (work->tsconn == NULL) {
rule_timescaledb *timescaledb = rules[i].timescaledb;
char conninfo[256] = { 0 };
snprintf(conninfo , 128, "dbname=postgres user=%s password=%s host=%s port=5432", timescaledb->username, timescaledb->password, timescaledb->host);
PGconn *conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
log_error("timescaledb error %s", PQerrorMessage(conn));
PQfinish(conn);
exit(1);
}
work->tsconn = conn;
}
char sql_clause[1024] = "INSERT INTO ";
char key[128] = { 0 };
snprintf(key, 128, "%s (", rules[i].timescaledb->table);
char value[800] = "VALUES (";
for (size_t j = 0; j < 9; j++) {
nng_mtx_lock(rule_mutex);
if (true == is_first_time_timescaledb) {
is_need_set_timescaledb = true;
}
char *ret =
compose_sql_clause(&rules[i],
key, value, is_need_set_timescaledb, j, work);
if (ret && is_need_set_timescaledb) {
is_need_set_timescaledb = false;
log_debug("ret - %s", ret);
char *p = ret;
char *p_b = ret;
while (NULL != p) {
char *p = strchr(p_b, '\n');
if (NULL != p) {
*p = '\0';
log_debug("p_b %s", p_b);
PGresult *res = PQexec(work->tsconn, p_b);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
log_debug("timescaledb error %s\n", PQerrorMessage(work->tsconn));
fprintf(stderr, "%s\n", PQerrorMessage(work->tsconn));
}
PQclear(res);
p_b = ++p;
} else {
break;
}
}
free(ret);
ret = NULL;
}
if (true == is_first_time_timescaledb) {
is_first_time_timescaledb = false;
}
nng_mtx_unlock(rule_mutex);
}
/* log_debug("%s", key); */
/* log_debug("%s", value); */
char *p = strrchr(key, ',');
*p = ')';
p = strrchr(value, ',');
*p = ')';
strcat(sql_clause, key);
strcat(sql_clause, value);
strcat(sql_clause, ";");
log_debug("%s", sql_clause);
PGresult *res = PQexec(work->tsconn, sql_clause);
log_debug("timescaledb res: %d\n", PQresultStatus(res));
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
log_debug("timescaledb error %s\n", PQerrorMessage(work->tsconn));
fprintf(stderr, "timescaledb error %s\n", PQerrorMessage(work->tsconn));
PQclear(res);
PQfinish(work->tsconn);
exit(1);
}
PQclear(res);
}
#endif
}
}

View File

@ -1704,6 +1704,68 @@ post_rules_sqlite(conf_rule *cr, cJSON *jso_params, char *rawsql)
}
#endif
#if defined(SUPP_TIMESCALEDB) && defined(SUPP_RULE_ENGINE)
static int
post_rules_timescaledb(conf_rule *cr, cJSON *jso_params, char *rawsql)
{
cJSON *jso_param = NULL;
rule_timescaledb *timescaledb = rule_timescaledb_init();
cJSON_ArrayForEach(jso_param, jso_params)
{
if (jso_param) {
if (!nng_strcasecmp(jso_param->string, "table")) {
timescaledb->table =
nng_strdup(jso_param->valuestring);
log_debug(
"table: %s\n", jso_param->valuestring);
} else if (!nng_strcasecmp(
jso_param->string, "username")) {
timescaledb->username =
nng_strdup(jso_param->valuestring);
log_debug(
"username: %s\n", jso_param->valuestring);
} else if (!nng_strcasecmp(
jso_param->string, "password")) {
timescaledb->password =
nng_strdup(jso_param->valuestring);
log_debug(
"password: %s\n", jso_param->valuestring);
} else if (!nng_strcasecmp(
jso_param->string, "host")) {
timescaledb->host =
nng_strdup(jso_param->valuestring);
log_debug(
"host: %s\n", jso_param->valuestring);
} else {
rule_timescaledb_free(timescaledb);
log_error("Unsupport key word!");
return REQ_PARAM_ERROR;
}
}
}
if (false == rule_timescaledb_check(timescaledb)) {
rule_timescaledb_free(timescaledb);
return MISSING_KEY_REQUEST_PARAMES;
}
rule_sql_parse(cr, rawsql);
cr->rules[cvector_size(cr->rules) - 1].forword_type =
RULE_FORWORD_TIMESCALEDB;
cr->rules[cvector_size(cr->rules) - 1].timescaledb = timescaledb;
cr->rules[cvector_size(cr->rules) - 1].raw_sql = nng_strdup(rawsql);
cr->rules[cvector_size(cr->rules) - 1].enabled = true;
cr->rules[cvector_size(cr->rules) - 1].rule_id =
rule_generate_rule_id();
if (-1 == nanomq_client_timescaledb(cr, true)) {
return REQ_PARAM_ERROR;
}
cr->option |= RULE_ENG_TDB;
return SUCCEED;
}
#endif
#if defined(SUPP_POSTGRESQL) && defined(SUPP_RULE_ENGINE)
static int
post_rules_postgresql(conf_rule *cr, cJSON *jso_params, char *rawsql)
@ -1972,6 +2034,13 @@ post_rules(http_msg *msg)
SUCCEED) {
goto error;
}
#endif
#if defined(SUPP_TIMESCALEDB)
} else if (!strcasecmp(name, "timescaledb")) {
if ((rc = post_rules_timescaledb(cr, jso_params, rawsql)) !=
SUCCEED) {
goto error;
}
#endif
} else {
log_error("Unsupport forword type !");
@ -2199,6 +2268,58 @@ put_rules_postgresql_parse(cJSON *jso_params, rule_postgresql *postgresql)
return SUCCEED;
}
static int
put_rules_timescaledb_parse(cJSON *jso_params, rule_timescaledb *timescaledb)
{
cJSON *jso_param = NULL;
cJSON_ArrayForEach(jso_param, jso_params)
{
if (jso_param) {
if (!nng_strcasecmp(jso_param->string, "table")) {
if (timescaledb->table) {
nng_strfree(timescaledb->table);
}
timescaledb->table =
nng_strdup(jso_param->valuestring);
log_debug(
"table: %s\n", jso_param->valuestring);
} else if (!nng_strcasecmp(
jso_param->string, "username")) {
if (timescaledb->username) {
nng_strfree(timescaledb->username);
}
timescaledb->username =
nng_strdup(jso_param->valuestring);
log_debug(
"username: %s\n", jso_param->valuestring);
} else if (!nng_strcasecmp(
jso_param->string, "password")) {
if (timescaledb->password) {
nng_strfree(timescaledb->password);
}
timescaledb->password =
nng_strdup(jso_param->valuestring);
log_debug(
"password: %s\n", jso_param->valuestring);
} else if (!nng_strcasecmp(
jso_param->string, "host")) {
if (timescaledb->host) {
nng_strfree(timescaledb->host);
}
timescaledb->host =
nng_strdup(jso_param->valuestring);
log_debug(
"host: %s\n", jso_param->valuestring);
} else {
log_error("Unsupport key word!");
return REQ_PARAM_ERROR;
}
}
}
return SUCCEED;
}
static int
put_rules_update_action(cJSON *jso_actions, rule *new_rule, conf_rule *cr)
{
@ -2255,7 +2376,17 @@ put_rules_update_action(cJSON *jso_actions, rule *new_rule, conf_rule *cr)
rule_postgresql_free(postgresql);
return rc;
}
} else if (!strcasecmp(name, "timescaledb")) {
if (new_rule->forword_type != RULE_FORWORD_TIMESCALEDB) {
log_error("Unsupport change from other type to timescaledb");
return REQ_PARAM_ERROR;
}
rule_timescaledb *timescaledb = new_rule->timescaledb;
rc = put_rules_timescaledb_parse(jso_params, timescaledb);
if (rc != SUCCEED) {
rule_timescaledb_free(timescaledb);
return rc;
}
} else {
log_debug("Unsupport forword type !");
return REQ_PARAM_ERROR;
@ -2329,6 +2460,9 @@ put_rules(http_msg *msg, kv **params, size_t param_num, const char *rule_id)
case RULE_FORWORD_POSTGRESQL:
new_rule->postgresql = cr->rules[i].postgresql;
break;
case RULE_FORWORD_TIMESCALEDB:
new_rule->timescaledb = cr->rules[i].timescaledb;
break;
case RULE_FORWORD_SQLITE:
new_rule->sqlite_table = cr->rules[i].sqlite_table;
break;
@ -2438,6 +2572,9 @@ delete_rules(http_msg *msg, kv **params, size_t param_num, const char *rule_id)
case RULE_FORWORD_POSTGRESQL:
rule_postgresql_free(re->postgresql);
break;
case RULE_FORWORD_TIMESCALEDB:
rule_timescaledb_free(re->timescaledb);
break;
case RULE_FORWORD_REPUB:
rule_repub_free(re->repub);
break;
@ -2499,6 +2636,9 @@ get_rules_helper(cJSON *data, rule *r)
case RULE_FORWORD_POSTGRESQL:
forword_type = "postgresql";
break;
case RULE_FORWORD_TIMESCALEDB:
forword_type = "timescaledb";
break;
default:
break;
}