2121#include < regex>
2222#include < mutex>
2323
24- #define RSCP2MQTT_VERSION " 3.26 "
24+ #define RSCP2MQTT_VERSION " 3.27 "
2525
2626#define AES_KEY_SIZE 32
2727#define AES_BLOCK_SIZE 32
@@ -505,7 +505,7 @@ void publishImmediately(char *t, char *p, bool influx) {
505505 char topic[TOPIC_SIZE];
506506
507507 snprintf (topic, TOPIC_SIZE, " %s/%s" , cfg.prefix , t);
508- if (mosq) mosquitto_publish (mosq, NULL , topic, strlen (p), p, cfg.mqtt_qos , cfg.mqtt_retain );
508+ if (mosq && ( mosquitto_pub_topic_check (topic) == MOSQ_ERR_SUCCESS) && p && strlen (p) ) mosquitto_publish (mosq, NULL , topic, strlen (p), p, cfg.mqtt_qos , cfg.mqtt_retain );
509509#ifdef INFLUXDB
510510 if (cfg.influxdb_on && curl && influx) {
511511 char buffer[CURL_BUFFER_SIZE];
@@ -867,17 +867,22 @@ void pushAdditionalTag(uint32_t req_container, uint32_t req_tag, int req_index,
867867}
868868
869869bool updateRawData (char *topic, char *payload) {
870- for (std::vector<RSCP_MQTT::mqtt_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
871- if (!strcmp (it->topic , topic)) {
872- if (strcmp (it->payload , payload)) strcpy (it->payload , payload);
873- return (true );
870+ if (topic && payload) {
871+ for (std::vector<RSCP_MQTT::mqtt_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
872+ if (!strcmp (it->topic , topic)) {
873+ if (strcmp (it->payload , payload)) {
874+ if (strlen (it->payload ) != strlen (payload)) it->payload = (char *)realloc (it->payload , strlen (payload) + 1 );
875+ strcpy (it->payload , payload);
876+ }
877+ return (true );
878+ }
874879 }
875880 }
876881 return (false );
877882}
878883
879884void mergeRawData (char *topic, char *payload) {
880- if (!updateRawData (topic, payload)) {
885+ if (topic && payload && !updateRawData (topic, payload)) {
881886 RSCP_MQTT::mqtt_data_t v;
882887 v.topic = strdup (topic);
883888 v.payload = strdup (payload);
@@ -2094,29 +2099,31 @@ void createRequest(SRscpFrameBuffer * frameBuffer) {
20942099}
20952100
20962101void publishRaw (RscpProtocol *protocol, SRscpValue *response, char *topic) {
2097- char *payload_new = (char *)malloc (PAYLOAD_SIZE * sizeof (char ));
2102+ char *payload_new = (char *)malloc (PAYLOAD_SIZE * sizeof (char ) + 1 );
20982103 char *payload_old = readRawData (topic);
2104+ memset (payload_new, 0 , sizeof (payload_new));
20992105 preparePayload (protocol, response, &payload_new);
2100- if (payload_old && strcmp (payload_old, payload_new)) {
2106+ if (payload_old && payload_new && strcmp (payload_new, " " ) && strcmp (payload_old, payload_new)) {
21012107 publishImmediately (topic, payload_new, false );
21022108 updateRawData (topic, payload_new);
2103- } else if (!payload_old) {
2109+ } else if (!payload_old && payload_new && strcmp (payload_new, " " ) ) {
21042110 publishImmediately (topic, payload_new, false );
21052111 mergeRawData (topic, payload_new);
21062112 }
21072113 if (payload_new) free (payload_new);
2108- return ;
2109- }
2114+ return ;
2115+ }
21102116
21112117void handleRaw (RscpProtocol *protocol, SRscpValue *response, uint32_t *cache, int level) {
21122118 int l = level + 1 ;
21132119 char topic[TOPIC_SIZE];
2120+ memset (topic, 0 , sizeof (topic));
21142121
21152122 if (response->dataType == RSCP::eTypeError) return ;
21162123
21172124 if (!l && (response->dataType != RSCP::eTypeContainer)) {
21182125 sprintf (topic, " raw/%s" , tagName (RSCP_TAGS::RscpTagsOverview, response->tag ));
2119- publishRaw (protocol, response, topic);
2126+ if (!cfg. raw_topic_regex || std::regex_match (topic, std::regex (cfg. raw_topic_regex ))) publishRaw (protocol, response, topic);
21202127 return ;
21212128 }
21222129 std::vector<SRscpValue> data = protocol->getValueAsContainer (response);
@@ -2131,11 +2138,13 @@ void handleRaw(RscpProtocol *protocol, SRscpValue *response, uint32_t *cache, in
21312138 strcpy (topic, " raw" );
21322139 for (int i = 0 ; i < RECURSION_MAX_LEVEL; i++) {
21332140 if (cache[i]) {
2134- strcat (topic, " /" );
2135- strcat (topic, tagName (RSCP_TAGS::RscpTagsOverview, cache[i]));
2141+ if (strlen (topic) + strlen (tagName (RSCP_TAGS::RscpTagsOverview, cache[i])) + 2 < TOPIC_SIZE) {
2142+ strcat (topic, " /" );
2143+ strcat (topic, tagName (RSCP_TAGS::RscpTagsOverview, cache[i]));
2144+ } else logMessageByTag (response->tag , data[i].tag , 0 , __LINE__, (char *)" Error: Topic name too long. Container >%s< Tag >%s< [%d]\n " );
21362145 }
21372146 }
2138- publishRaw (protocol, &data[i], topic);
2147+ if (!cfg. raw_topic_regex || std::regex_match (topic, std::regex (cfg. raw_topic_regex ))) publishRaw (protocol, &data[i], topic);
21392148 }
21402149 }
21412150 }
@@ -2185,8 +2194,8 @@ int handleResponseValue(RscpProtocol *protocol, SRscpValue *response) {
21852194
21862195 if (cfg.raw_mode ) {
21872196 for (int i = 0 ; i < RECURSION_MAX_LEVEL; i++) cache[i] = 0 ;
2188- handleRaw (protocol, response, cache, -1 );
2189- }
2197+ if (mosq) handleRaw (protocol, response, cache, -1 );
2198+ }
21902199
21912200 // check the SRscpValue TAG to detect which response it is
21922201 switch (response->tag ) {
@@ -2740,7 +2749,7 @@ static void mainLoop(void) {
27402749 if (mosq) {
27412750 char topic[TOPIC_SIZE];
27422751 snprintf (topic, TOPIC_SIZE, " %s/rscp2mqtt/status" , cfg.prefix );
2743- mosquitto_threaded_set (mosq, true );
2752+ // mosquitto_threaded_set(mosq, true); // necessary?
27442753 if (cfg.mqtt_tls && cfg.mqtt_tls_password ) {
27452754 if (mosquitto_tls_set (mosq, cfg.mqtt_tls_cafile , cfg.mqtt_tls_capath , cfg.mqtt_tls_certfile , cfg.mqtt_tls_keyfile , mqttCallbackTlsPassword) != MOSQ_ERR_SUCCESS) {
27462755 logMessage (cfg.logfile , (char *)__FILE__, __LINE__, (char *)" Error: Unable to set TLS options.\n " );
@@ -2755,8 +2764,10 @@ static void mainLoop(void) {
27552764 if (cfg.mqtt_auth && strcmp (cfg.mqtt_user , " " ) && strcmp (cfg.mqtt_password , " " )) mosquitto_username_pw_set (mosq, cfg.mqtt_user , cfg.mqtt_password );
27562765 mosquitto_will_set (mosq, topic, strlen (" disconnected" ), " disconnected" , cfg.mqtt_qos , cfg.mqtt_retain );
27572766 if (!mosquitto_connect (mosq, cfg.mqtt_host , cfg.mqtt_port , 10 )) {
2758- std::thread th (mqttListener, mosq);
2759- th.detach ();
2767+ if (!cfg.once ) {
2768+ std::thread th (mqttListener, mosq);
2769+ th.detach ();
2770+ }
27602771 if (cfg.verbose ) logMessage (cfg.logfile , (char *)__FILE__, __LINE__, (char *)" Success: MQTT broker connected.\n " );
27612772 } else {
27622773 logMessage (cfg.logfile , (char *)__FILE__, __LINE__, (char *)" Error: MQTT broker connection failed.\n " );
@@ -2935,6 +2946,7 @@ int main(int argc, char *argv[]) {
29352946 strcpy (cfg.true_value , " true" );
29362947 strcpy (cfg.false_value , " false" );
29372948 cfg.raw_mode = false ;
2949+ cfg.raw_topic_regex = NULL ;
29382950
29392951 // signal handler
29402952 signal (SIGINT, signal_handler);
@@ -3109,6 +3121,8 @@ int main(int argc, char *argv[]) {
31093121#endif
31103122 else if ((strcasecmp (key, " RAW_MODE" ) == 0 ) && (strcasecmp (value, " true" ) == 0 ))
31113123 cfg.raw_mode = true ;
3124+ else if (strcasecmp (key, " RAW_TOPIC_REGEX" ) == 0 )
3125+ cfg.raw_topic_regex = strdup (value);
31123126 else if (strncasecmp (key, " ADD_NEW_REQUEST" , strlen (" ADD_NEW_REQUEST" )) == 0 ) {
31133127 int order = 0 ;
31143128 int index = -1 ;
@@ -3467,6 +3481,10 @@ int main(int argc, char *argv[]) {
34673481 RSCP_MQTT::IdlePeriodCache.clear ();
34683482 RSCP_MQTT::ErrorCache.clear ();
34693483
3484+ // MQTT disconnect
3485+ mosquitto_disconnect (mosq);
3486+ mosq = NULL ;
3487+
34703488 // MQTT cleanup
34713489 mosquitto_lib_cleanup ();
34723490
@@ -3486,6 +3504,7 @@ int main(int argc, char *argv[]) {
34863504 if (cfg.mqtt_tls_certfile ) free (cfg.mqtt_tls_certfile );
34873505 if (cfg.mqtt_tls_keyfile ) free (cfg.mqtt_tls_keyfile );
34883506 if (cfg.mqtt_tls_password ) free (cfg.mqtt_tls_password );
3507+ if (cfg.raw_topic_regex ) free (cfg.raw_topic_regex );
34893508
34903509 exit (EXIT_SUCCESS);
34913510}
0 commit comments