linbpq/mqtt.c

435 lines
10 KiB
C

#define _CRT_SECURE_NO_DEPRECATE
#ifndef NOMQTT
#include "MQTTAsync.h"
#ifndef WIN32
#include <jansson.h>
#endif
#include "CHeaders.h"
#include "asmstrucs.h"
#include "mqtt.h"
extern int MQTT_Connecting;
extern int MQTT_Connected;
DllExport int APIENTRY MQTTSend(char * topic, char * Msg, int MsgLen);
MQTTAsync client = NULL;
time_t MQTTLastStatus = 0;
void MQTTSendStatus()
{
char topic[256];
char payload[128];
sprintf(topic, "PACKETNODE/%s", NODECALLLOPPED);
strcpy(payload,"{\"status\":\"online\"}");
MQTTSend(topic, payload, strlen(payload));
MQTTLastStatus = time(NULL);
}
void MQTTTimer()
{
if (MQTT_Connecting == 0 && MQTT_Connected == 0)
MQTTConnect(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS);
if ((time(NULL) - MQTTLastStatus) > 1800)
MQTTSendStatus();
}
void MQTTDisconnect()
{
if (MQTT_Connected)
{
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_disconnect(client, &disc_opts);
MQTT_Connecting = MQTT_Connected = 0;
// Try to recconect. If it fails system will rety every minute
MQTTConnect(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS);
}
}
DllExport int APIENTRY MQTTSend(char * topic, char * Msg, int MsgLen)
{
int rc;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = Msg;
pubmsg.payloadlen = MsgLen;
rc = MQTTAsync_sendMessage(client, topic, &pubmsg, &opts);
if (rc)
MQTTDisconnect();
return rc;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTT_Connecting = 0;
MQTT_Connected = 1;
printf("Successful MQTT connection\n");
// Send start up message
MQTTSendStatus();
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("MQTT connection failed, rc %d\n", response ? response->code : 0);
MQTT_Connecting = 0;
}
char* jsonEncodeMessage(MESSAGE *msg)
{
char From[10];
char To[10];
char buffer[1024];
unsigned long long SaveMMASK = MMASK;
BOOL SaveMTX = MTX;
BOOL SaveMCOM = MCOM;
BOOL SaveMUI = MUIONLY;
int len;
char *msg_str;
char payload_timestamp[16];
struct tm * TM = localtime(&msg->Timestamp);
sprintf(payload_timestamp, "%02d:%02d:%02d", TM->tm_hour, TM->tm_min, TM->tm_sec);
IntSetTraceOptionsEx(MMASK, TRUE, TRUE, FALSE);
From[ConvFromAX25(msg->ORIGIN, From)] = 0;
To[ConvFromAX25(msg->DEST, To)] = 0;
len = IntDecodeFrame(msg, buffer, msg->Timestamp, 0xffffffffffffffff, FALSE, FALSE);
IntSetTraceOptionsEx(SaveMMASK, SaveMTX, SaveMCOM, SaveMUI);
buffer[len] = 0;
#ifdef WIN32
msg_str = zalloc(2048);
sprintf(msg_str, "{\"from\": \"%s\", \"to\": \"%s\", \"payload\": \"%s\", \"port\": %d, \"timestamp\": \"%s\"}",
From, To, buffer, msg->PORT, payload_timestamp);
#else
json_t *root;
root = json_object();
json_object_set_new(root, "from", json_string(From));
json_object_set_new(root, "to", json_string(To));
json_object_set_new(root, "payload", json_string(buffer));
json_object_set_new(root, "port", json_integer(msg->PORT));
sprintf(payload_timestamp, "%02d:%02d:%02d", TM->tm_hour, TM->tm_min, TM->tm_sec);
json_object_set_new(root, "timestamp", json_string(payload_timestamp));
msg_str = json_dumps(root, 0);
json_decref(root);
#endif
return msg_str;
}
void MQTTKISSTX(void *message)
{
MESSAGE *msg = (MESSAGE *)message;
char topic[256];
char *msg_str;
sprintf(topic, "PACKETNODE/ax25/trace/bpqformat/%s/sent/%d", NODECALLLOPPED, msg->PORT);
msg_str = jsonEncodeMessage(msg);
MQTTSend(topic, msg_str, strlen(msg_str));
free(msg_str);
}
void MQTTKISSTX_RAW(char* buffer, int bufferLength, void* PORT)
{
PPORTCONTROL PPORT = (PPORTCONTROL)PORT;
char topic[256];
sprintf(topic, "PACKETNODE/kiss/%s/sent/%d", NODECALLLOPPED, PPORT->PORTNUMBER);
MQTTSend(topic, buffer, bufferLength);
}
void MQTTKISSRX(void *message)
{
MESSAGE *msg = (MESSAGE *)message;
char topic[256];
char *msg_str;
sprintf(topic, "PACKETNODE/ax25/trace/bpqformat/%s/rcvd/%d", NODECALLLOPPED, msg->PORT);
msg_str = jsonEncodeMessage(msg);
MQTTSend(topic, msg_str, strlen(msg_str));
free(msg_str);
}
void MQTTKISSRX_RAW(char* buffer, int bufferLength, void* PORT)
{
PPORTCONTROL PPORT = (PPORTCONTROL)PORT;
char topic[256];
sprintf(topic, "PACKETNODE/kiss/%s/rcvd/%d", NODECALLLOPPED, PPORT->PORTNUMBER);
MQTTSend(topic, buffer, bufferLength);
}
void MQTTReportSession(char * Msg)
{
char topic[256];
sprintf(topic, "PACKETNODE/stats/session/%s", NODECALLLOPPED);
MQTTSend(topic, Msg, strlen(Msg));
}
char* replace(char* str, char* a, char* b)
{
int len = strlen(str);
int lena = strlen(a), lenb = strlen(b);
char * p;
for (p = str; p = strstr(p, a); p) {
if (lena != lenb) // shift end as needed
memmove(p + lenb, p + lena,
len - (p - str) + lenb);
memcpy(p, b, lenb);
}
return str;
}
int MQTTPublish(void *message, char *topic)
{
MESSAGE *msg = (MESSAGE *)message;
char From[10];
char To[10];
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
unsigned long long SaveMMASK = MMASK;
BOOL SaveMTX = MTX;
BOOL SaveMCOM = MCOM;
BOOL SaveMUI = MUIONLY;
int len;
char* replaced_buffer;
char buffer[1024];
time_t timestamp = msg->Timestamp;
From[ConvFromAX25(msg->ORIGIN, From)] = 0;
To[ConvFromAX25(msg->DEST, To)] = 0;
IntSetTraceOptionsEx(8, TRUE, TRUE, FALSE);
len = IntDecodeFrame(msg, buffer, timestamp, 1, FALSE, FALSE);
IntSetTraceOptionsEx(SaveMMASK, SaveMTX, SaveMCOM, SaveMUI);
// MQTT _really_ doesn't like \r, so replace it with something
// that is at least human readable
replaced_buffer = replace(buffer, "\r", "\r\n");
pubmsg.payload = replaced_buffer;
pubmsg.payloadlen = strlen(replaced_buffer);
printf("%s\n", replaced_buffer);
return MQTTAsync_sendMessage(client, topic, &pubmsg, &opts);
}
int MQTTConnect(char* host, int port, char* user, char* pass)
{
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
char hostString[256];
sprintf(hostString, "tcp://%s:%d", host, port);
printf("MQTT Connect to %s\n", hostString);
rc = MQTTAsync_create(&client, hostString, NODECALLLOPPED, MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTASYNC_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
return rc;
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = user;
conn_opts.password = pass;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
// conn_opts.automaticReconnect = 1;
// conn_opts.minRetryInterval = 30;
// conn_opts.maxRetryInterval = 300;
rc = MQTTAsync_connect(client, &conn_opts);
if (rc != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
return rc;
}
MQTT_Connecting = 1;
return 0;
}
// Message Database Entry. Designed to be compatible with FBB
#define NBBBS 160 // Max BBSes we can forward to. Must be Multiple of 8, and must be 80 for FBB compatibliliy
#define NBMASK NBBBS/8 // Number of bytes in Forward bitlists.
#pragma pack(1)
struct MsgInfo
{
char type;
char status;
int number;
int length;
int xdatereceived;
char bbsfrom[7]; // ? BBS we got it from ?
char via[41];
char from[7];
char to[7];
char bid[13];
char title[61];
int nntpnum; // Number within topic (ie Bull TO Addr) - used for nntp
UCHAR B2Flags; // Not all flags specific to B2
#define B2Msg 1 // Set if Message File is a formatted B2 message
#define Attachments 2 // Set if B2 message has attachments
#define FromPaclink 4
#define FromCMS 8
#define FromRMSExpress 16
#define RadioOnlyMsg 32 // Received using call-T
#define RadioOnlyFwd 64 // Received using call-R
#define WarnNotForwardedSent 128
int xdatecreated;
int xdatechanged;
UCHAR fbbs[NBMASK];
UCHAR forw[NBMASK];
char emailfrom[41];
char Locked; // Set if selected for sending (NTS Pickup)
char Defered; // FBB response '=' received
UCHAR UTF8; // Set if Message is in UTF8 (ie from POP/SMTP)
// For 64 bit time_t compatibility define as long long
// (so struct is same with 32 or 64 bit time_t)
int64_t datereceived;
int64_t datecreated;
int64_t datechanged;
char Spare[61 - 24]; // For future use
} ;
#pragma pack()
void MQTTMessageEvent(void* message)
{
struct MsgInfo* msg = (struct MsgInfo *)message;
char *msg_str;
char * ptr;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
char topic[256];
json_t *root = json_object();
json_object_set_new(root, "id", json_integer(msg->number));
json_object_set_new(root, "size", json_integer(msg->length));
json_object_set_new(root, "type", json_string(msg->type == 'P' ? "P" : "B"));
json_object_set_new(root, "to", json_string(msg->to));
json_object_set_new(root, "from", json_string(msg->from));
json_object_set_new(root, "subj", json_string(msg->title));
switch(msg->status) {
case 'N':
json_object_set_new(root, "event", json_string("newmsg"));
break;
case 'F':
json_object_set_new(root, "event", json_string("fwded"));
break;
case 'R':
json_object_set_new(root, "event", json_string("read"));
break;
case 'K':
json_object_set_new(root, "event", json_string("killed"));
break;
}
msg_str = json_dumps(root, 0);
pubmsg.payload = msg_str;
pubmsg.payloadlen = strlen(msg_str);
sprintf(topic, "PACKETNODE/event/%s/pmsg", NODECALLLOPPED);
MQTTAsync_sendMessage(client, topic, &pubmsg, &opts);
}
#else
// Dummies ofr build without MQTT libraries
int MQTTConnect(char* host, int port, char* user, char* pass)
{
return 0;
}
void MQTTKISSTX(void *message) {};
void MQTTKISSTX_RAW(char* buffer, int bufferLength, void* PORT) {};
void MQTTKISSRX(void *message) {};
void MQTTKISSRX_RAW(char* buffer, int bufferLength, void* PORT) {};
void MQTTTimer() {};
void MQTTReportSession(char * Msg) {};
#endif