2222#include < regex>
2323#include < mutex>
2424
25- #define RSCP2MQTT_VERSION " 3.33 "
25+ #define RSCP2MQTT_VERSION " 3.34 "
2626
2727#define AES_KEY_SIZE 32
2828#define AES_BLOCK_SIZE 32
@@ -856,15 +856,15 @@ void pushNotSupportedTag(uint32_t container, uint32_t tag) {
856856 return ;
857857}
858858
859- bool existsAdditionalTag (uint32_t container, uint32_t tag, int index) {
859+ bool existsAdditionalTag (uint32_t container, uint32_t tag, int index, int order ) {
860860 for (std::vector<RSCP_MQTT::additional_tags_t >::iterator it = RSCP_MQTT::AdditionalTags.begin (); it != RSCP_MQTT::AdditionalTags.end (); ++it) {
861- if ((it->req_container == container) && (it->req_tag == tag) && (it->req_index == index)) return (true );
862- }
861+ if ((it->req_container == container) && (it->req_tag == tag) && (it->req_index == index) && (it-> order == order)) return (true );
862+ }
863863 return (false );
864864}
865865
866866void pushAdditionalTag (uint32_t req_container, uint32_t req_tag, int req_index, int order, bool one_shot) {
867- if (existsAdditionalTag (req_container, req_tag, req_index)) return ;
867+ if (existsAdditionalTag (req_container, req_tag, req_index, order )) return ;
868868 RSCP_MQTT::additional_tags_t v;
869869 v.req_container = req_container;
870870 v.req_tag = req_tag;
@@ -876,38 +876,40 @@ void pushAdditionalTag(uint32_t req_container, uint32_t req_tag, int req_index,
876876 return ;
877877}
878878
879- bool updateRawData (char *topic, char *payload) {
879+ void initRawData () {
880+ for (std::vector<RSCP_MQTT::raw_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
881+ it->handled = false ;
882+ it->changed = false ;
883+ }
884+ return ;
885+ }
886+
887+ int mergeRawData (char *topic, char *payload, bool *changed) {
888+ int i = 0 ;
880889 if (topic && payload) {
881- for (std::vector<RSCP_MQTT::mqtt_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
882- if (!strcmp (it->topic , topic)) {
890+ for (std::vector<RSCP_MQTT::raw_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
891+ if (!strcmp (it->topic , topic) && !it-> handled && (i == it-> nr ) ) {
883892 if (strcmp (it->payload , payload)) {
884893 if (strlen (it->payload ) != strlen (payload)) it->payload = (char *)realloc (it->payload , strlen (payload) + 1 );
885894 strcpy (it->payload , payload);
895+ it->changed = true ;
896+ *changed = true ;
886897 }
887- return (true );
898+ it->handled = true ;
899+ return (i);
888900 }
901+ if (!strcmp (it->topic , topic) && it->handled ) i++;
889902 }
890- }
891- return (false );
892- }
893-
894- void insertRawData (char *topic, char *payload) {
895- if (topic && payload) {
896- RSCP_MQTT::mqtt_data_t v;
903+ RSCP_MQTT::raw_data_t v;
897904 v.topic = strdup (topic);
898905 v.payload = strdup (payload);
906+ v.handled = true ;
907+ v.changed = true ;
908+ v.nr = i;
899909 RSCP_MQTT::rawData.push_back (v);
910+ *changed = true ;
900911 }
901- return ;
902- }
903-
904- char *readRawData (char *topic) {
905- for (std::vector<RSCP_MQTT::mqtt_data_t >::iterator it = RSCP_MQTT::rawData.begin (); it != RSCP_MQTT::rawData.end (); ++it) {
906- if (!strcmp (it->topic , topic)) {
907- return (it->payload );
908- }
909- }
910- return (NULL );
912+ return (i);
911913}
912914
913915void refreshCache (std::vector<RSCP_MQTT::cache_t > & v, char *payload) {
@@ -1085,6 +1087,13 @@ float getFloatValue(std::vector<RSCP_MQTT::cache_t> & c, uint32_t container, uin
10851087 return (value);
10861088}
10871089
1090+ void resetHandleFlag (std::vector<RSCP_MQTT::cache_t > & c) {
1091+ for (std::vector<RSCP_MQTT::cache_t >::iterator it = c.begin (); it != c.end (); ++it) {
1092+ it->handled = false ;
1093+ }
1094+ return ;
1095+ }
1096+
10881097void preparePayload (RscpProtocol *protocol, SRscpValue *response, char **buf) {
10891098 switch (response->dataType ) {
10901099 case RSCP::eTypeBool: {
@@ -1143,8 +1152,7 @@ int storeResponseValue(std::vector<RSCP_MQTT::cache_t> & c, RscpProtocol *protoc
11431152 int rc = -1 ;
11441153
11451154 for (std::vector<RSCP_MQTT::cache_t >::iterator it = c.begin (); it != c.end (); ++it) {
1146- if ((it->container > container) && (it->tag > response->tag )) break ;
1147- if ((!it->container || (it->container == container)) && (it->tag == response->tag ) && (it->index == index)) {
1155+ if ((!it->container || (it->container == container)) && (it->tag == response->tag ) && (it->index == index) && !it->handled ) {
11481156 switch (response->dataType ) {
11491157 case RSCP::eTypeBool: {
11501158 if (protocol->getValueAsBool (response)) strcpy (buf, " true" );
@@ -1287,6 +1295,8 @@ int storeResponseValue(std::vector<RSCP_MQTT::cache_t> & c, RscpProtocol *protoc
12871295 if ((atoi (it->payload ) == 0 ) && (battery_soc > 1 )) snprintf (it->payload , PAYLOAD_SIZE, " %d" , battery_soc--);
12881296 else battery_soc = atoi (it->payload );
12891297 }
1298+ it->handled = true ;
1299+ break ;
12901300 }
12911301 }
12921302 return (rc);
@@ -2122,21 +2132,24 @@ void createRequest(SRscpFrameBuffer * frameBuffer) {
21222132 return ;
21232133}
21242134
2125- void publishRaw (RscpProtocol *protocol, SRscpValue *response, char *topic) {
2126- char *payload_new = (char *)malloc (PAYLOAD_SIZE * sizeof (char ) + 1 );
2127- char *payload_old = readRawData (topic);
2128- memset (payload_new, 0 , PAYLOAD_SIZE);
2129- preparePayload (protocol, response, &payload_new);
2130- if (payload_old && payload_new && strcmp (payload_new, " " ) && strcmp (payload_old, payload_new)) {
2131- publishImmediately (topic, payload_new, false );
2132- updateRawData (topic, payload_new);
2133- } else if (!payload_old && payload_new && strcmp (payload_new, " " )) {
2134- publishImmediately (topic, payload_new, false );
2135- insertRawData (topic, payload_new);
2136- }
2137- if (payload_new) free (payload_new);
2138- return ;
2139- }
2135+ void publishRaw (RscpProtocol *protocol, SRscpValue *response, char *topic_in) {
2136+ char topic[TOPIC_SIZE];
2137+ char *payload = (char *)malloc (PAYLOAD_SIZE * sizeof (char ) + 1 );
2138+ bool changed = false ;
2139+ memset (payload, 0 , PAYLOAD_SIZE);
2140+ preparePayload (protocol, response, &payload);
2141+
2142+ int nr = mergeRawData (topic_in, payload, &changed);
2143+ if (nr > 0 ) {
2144+ if (snprintf (topic, TOPIC_SIZE, " %s/%d" , topic_in, nr) >= TOPIC_SIZE) {
2145+ logMessage (cfg.logfile , (char *)__FILE__, __LINE__, (char *)" publishRaw: Buffer overflow\n " );
2146+ return ;
2147+ }
2148+ if (changed) publishImmediately (topic, payload, false );
2149+ } else if (changed) publishImmediately (topic_in, payload, false );
2150+ if (payload) free (payload);
2151+ return ;
2152+ }
21402153
21412154void handleRaw (RscpProtocol *protocol, SRscpValue *response, uint32_t *cache, int level) {
21422155 int l = level + 1 ;
@@ -2612,6 +2625,7 @@ static int processReceiveBuffer(const unsigned char * ucBuffer, int iLength) {
26122625 battery_nr = 0 ;
26132626 pm_nr = 0 ;
26142627 wb_nr = 0 ;
2628+ if (cfg.raw_mode ) initRawData ();
26152629 for (size_t i = 0 ; i < frame.data .size (); i++)
26162630 handleResponseValue (&protocol, &frame.data [i]);
26172631
@@ -2744,6 +2758,8 @@ static void mainLoop(void) {
27442758
27452759 gettimeofday (&start, NULL );
27462760
2761+ resetHandleFlag (RSCP_MQTT::RscpMqttCache);
2762+
27472763 // create an RSCP frame with requests to some example data
27482764 createRequest (&frameBuffer);
27492765
0 commit comments