#define _CRT_SECURE_NO_DEPRECATE #ifndef NOMQTT #include "MQTTAsync.h" #ifndef WIN32 #include #endif #include "CHeaders.h" #include "asmstrucs.h" #include "mqtt.h" extern int MQTT_Connecting; extern int MQTT_Connected; DllExport int APIENTRY MQTTSend(char * topic, char * Msg, int MsgLen); MQTTAsync client = NULL; time_t MQTTLastStatus = 0; void MQTTSendStatus() { char topic[256]; char payload[128]; sprintf(topic, "PACKETNODE/%s", NODECALLLOPPED); strcpy(payload,"{\"status\":\"online\"}"); MQTTSend(topic, payload, strlen(payload)); MQTTLastStatus = time(NULL); } void MQTTTimer() { if (MQTT_Connecting == 0 && MQTT_Connected == 0) MQTTConnect(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS); if ((time(NULL) - MQTTLastStatus) > 1800) MQTTSendStatus(); } void MQTTDisconnect() { if (MQTT_Connected) { MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnect(client, &disc_opts); MQTT_Connecting = MQTT_Connected = 0; // Try to recconect. If it fails system will rety every minute MQTTConnect(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS); } } DllExport int APIENTRY MQTTSend(char * topic, char * Msg, int MsgLen) { int rc; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; pubmsg.payload = Msg; pubmsg.payloadlen = MsgLen; rc = MQTTAsync_sendMessage(client, topic, &pubmsg, &opts); if (rc) MQTTDisconnect(); return rc; } void onConnect(void* context, MQTTAsync_successData* response) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; MQTT_Connecting = 0; MQTT_Connected = 1; printf("Successful MQTT connection\n"); // Send start up message MQTTSendStatus(); } void onConnectFailure(void* context, MQTTAsync_failureData* response) { printf("MQTT connection failed, rc %d\n", response ? response->code : 0); MQTT_Connecting = 0; } char* jsonEncodeMessage(MESSAGE *msg) { char From[10]; char To[10]; char buffer[1024]; unsigned long long SaveMMASK = MMASK; BOOL SaveMTX = MTX; BOOL SaveMCOM = MCOM; BOOL SaveMUI = MUIONLY; int len; char *msg_str; char payload_timestamp[16]; struct tm * TM = localtime(&msg->Timestamp); sprintf(payload_timestamp, "%02d:%02d:%02d", TM->tm_hour, TM->tm_min, TM->tm_sec); IntSetTraceOptionsEx(MMASK, TRUE, TRUE, FALSE); From[ConvFromAX25(msg->ORIGIN, From)] = 0; To[ConvFromAX25(msg->DEST, To)] = 0; len = IntDecodeFrame(msg, buffer, msg->Timestamp, 0xffffffffffffffff, FALSE, FALSE); IntSetTraceOptionsEx(SaveMMASK, SaveMTX, SaveMCOM, SaveMUI); buffer[len] = 0; #ifdef WIN32 msg_str = zalloc(2048); sprintf(msg_str, "{\"from\": \"%s\", \"to\": \"%s\", \"payload\": \"%s\", \"port\": %d, \"timestamp\": \"%s\"}", From, To, buffer, msg->PORT, payload_timestamp); #else json_t *root; root = json_object(); json_object_set_new(root, "from", json_string(From)); json_object_set_new(root, "to", json_string(To)); json_object_set_new(root, "payload", json_string(buffer)); json_object_set_new(root, "port", json_integer(msg->PORT)); sprintf(payload_timestamp, "%02d:%02d:%02d", TM->tm_hour, TM->tm_min, TM->tm_sec); json_object_set_new(root, "timestamp", json_string(payload_timestamp)); msg_str = json_dumps(root, 0); json_decref(root); #endif return msg_str; } void MQTTKISSTX(void *message) { MESSAGE *msg = (MESSAGE *)message; char topic[256]; char *msg_str; sprintf(topic, "PACKETNODE/ax25/trace/bpqformat/%s/sent/%d", NODECALLLOPPED, msg->PORT); msg_str = jsonEncodeMessage(msg); MQTTSend(topic, msg_str, strlen(msg_str)); free(msg_str); } void MQTTKISSTX_RAW(char* buffer, int bufferLength, void* PORT) { PPORTCONTROL PPORT = (PPORTCONTROL)PORT; char topic[256]; sprintf(topic, "PACKETNODE/kiss/%s/sent/%d", NODECALLLOPPED, PPORT->PORTNUMBER); MQTTSend(topic, buffer, bufferLength); } void MQTTKISSRX(void *message) { MESSAGE *msg = (MESSAGE *)message; char topic[256]; char *msg_str; sprintf(topic, "PACKETNODE/ax25/trace/bpqformat/%s/rcvd/%d", NODECALLLOPPED, msg->PORT); msg_str = jsonEncodeMessage(msg); MQTTSend(topic, msg_str, strlen(msg_str)); free(msg_str); } void MQTTKISSRX_RAW(char* buffer, int bufferLength, void* PORT) { PPORTCONTROL PPORT = (PPORTCONTROL)PORT; char topic[256]; sprintf(topic, "PACKETNODE/kiss/%s/rcvd/%d", NODECALLLOPPED, PPORT->PORTNUMBER); MQTTSend(topic, buffer, bufferLength); } void MQTTReportSession(char * Msg) { char topic[256]; sprintf(topic, "PACKETNODE/stats/session/%s", NODECALLLOPPED); MQTTSend(topic, Msg, strlen(Msg)); } char* replace(char* str, char* a, char* b) { int len = strlen(str); int lena = strlen(a), lenb = strlen(b); char * p; for (p = str; p = strstr(p, a); p) { if (lena != lenb) // shift end as needed memmove(p + lenb, p + lena, len - (p - str) + lenb); memcpy(p, b, lenb); } return str; } int MQTTPublish(void *message, char *topic) { MESSAGE *msg = (MESSAGE *)message; char From[10]; char To[10]; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; unsigned long long SaveMMASK = MMASK; BOOL SaveMTX = MTX; BOOL SaveMCOM = MCOM; BOOL SaveMUI = MUIONLY; int len; char* replaced_buffer; char buffer[1024]; time_t timestamp = msg->Timestamp; From[ConvFromAX25(msg->ORIGIN, From)] = 0; To[ConvFromAX25(msg->DEST, To)] = 0; IntSetTraceOptionsEx(8, TRUE, TRUE, FALSE); len = IntDecodeFrame(msg, buffer, timestamp, 1, FALSE, FALSE); IntSetTraceOptionsEx(SaveMMASK, SaveMTX, SaveMCOM, SaveMUI); // MQTT _really_ doesn't like \r, so replace it with something // that is at least human readable replaced_buffer = replace(buffer, "\r", "\r\n"); pubmsg.payload = replaced_buffer; pubmsg.payloadlen = strlen(replaced_buffer); printf("%s\n", replaced_buffer); return MQTTAsync_sendMessage(client, topic, &pubmsg, &opts); } int MQTTConnect(char* host, int port, char* user, char* pass) { MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; char hostString[256]; sprintf(hostString, "tcp://%s:%d", host, port); printf("MQTT Connect to %s\n", hostString); rc = MQTTAsync_create(&client, hostString, NODECALLLOPPED, MQTTCLIENT_PERSISTENCE_NONE, NULL); if (rc != MQTTASYNC_SUCCESS) { printf("Failed to create client, return code %d\n", rc); return rc; } conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.username = user; conn_opts.password = pass; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; // conn_opts.automaticReconnect = 1; // conn_opts.minRetryInterval = 30; // conn_opts.maxRetryInterval = 300; rc = MQTTAsync_connect(client, &conn_opts); if (rc != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", rc); return rc; } MQTT_Connecting = 1; return 0; } // Message Database Entry. Designed to be compatible with FBB #define NBBBS 160 // Max BBSes we can forward to. Must be Multiple of 8, and must be 80 for FBB compatibliliy #define NBMASK NBBBS/8 // Number of bytes in Forward bitlists. #pragma pack(1) struct MsgInfo { char type; char status; int number; int length; int xdatereceived; char bbsfrom[7]; // ? BBS we got it from ? char via[41]; char from[7]; char to[7]; char bid[13]; char title[61]; int nntpnum; // Number within topic (ie Bull TO Addr) - used for nntp UCHAR B2Flags; // Not all flags specific to B2 #define B2Msg 1 // Set if Message File is a formatted B2 message #define Attachments 2 // Set if B2 message has attachments #define FromPaclink 4 #define FromCMS 8 #define FromRMSExpress 16 #define RadioOnlyMsg 32 // Received using call-T #define RadioOnlyFwd 64 // Received using call-R #define WarnNotForwardedSent 128 int xdatecreated; int xdatechanged; UCHAR fbbs[NBMASK]; UCHAR forw[NBMASK]; char emailfrom[41]; char Locked; // Set if selected for sending (NTS Pickup) char Defered; // FBB response '=' received UCHAR UTF8; // Set if Message is in UTF8 (ie from POP/SMTP) // For 64 bit time_t compatibility define as long long // (so struct is same with 32 or 64 bit time_t) int64_t datereceived; int64_t datecreated; int64_t datechanged; char Spare[61 - 24]; // For future use } ; #pragma pack() void MQTTMessageEvent(void* message) { struct MsgInfo* msg = (struct MsgInfo *)message; char *msg_str; char * ptr; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; char topic[256]; json_t *root = json_object(); json_object_set_new(root, "id", json_integer(msg->number)); json_object_set_new(root, "size", json_integer(msg->length)); json_object_set_new(root, "type", json_string(msg->type == 'P' ? "P" : "B")); json_object_set_new(root, "to", json_string(msg->to)); json_object_set_new(root, "from", json_string(msg->from)); json_object_set_new(root, "subj", json_string(msg->title)); switch(msg->status) { case 'N': json_object_set_new(root, "event", json_string("newmsg")); break; case 'F': json_object_set_new(root, "event", json_string("fwded")); break; case 'R': json_object_set_new(root, "event", json_string("read")); break; case 'K': json_object_set_new(root, "event", json_string("killed")); break; } msg_str = json_dumps(root, 0); pubmsg.payload = msg_str; pubmsg.payloadlen = strlen(msg_str); sprintf(topic, "PACKETNODE/event/%s/pmsg", NODECALLLOPPED); MQTTAsync_sendMessage(client, topic, &pubmsg, &opts); } #else // Dummies ofr build without MQTT libraries int MQTTConnect(char* host, int port, char* user, char* pass) { return 0; } void MQTTKISSTX(void *message) {}; void MQTTKISSTX_RAW(char* buffer, int bufferLength, void* PORT) {}; void MQTTKISSRX(void *message) {}; void MQTTKISSRX_RAW(char* buffer, int bufferLength, void* PORT) {}; void MQTTTimer() {}; void MQTTReportSession(char * Msg) {}; #endif