435 lines
10 KiB
C
435 lines
10 KiB
C
|
|
#define _CRT_SECURE_NO_DEPRECATE
|
|
|
|
#ifndef NOMQTT
|
|
|
|
#include "MQTTAsync.h"
|
|
#ifndef WIN32
|
|
#include <jansson.h>
|
|
#endif
|
|
|
|
#include "CHeaders.h"
|
|
#include "asmstrucs.h"
|
|
#include "mqtt.h"
|
|
|
|
extern int MQTT_Connecting;
|
|
extern int MQTT_Connected;
|
|
|
|
|
|
DllExport int APIENTRY MQTTSend(char * topic, char * Msg, int MsgLen);
|
|
|
|
MQTTAsync client = NULL;
|
|
|
|
time_t MQTTLastStatus = 0;
|
|
|
|
void MQTTSendStatus()
|
|
{
|
|
char topic[256];
|
|
char payload[128];
|
|
|
|
sprintf(topic, "PACKETNODE/%s", NODECALLLOPPED);
|
|
strcpy(payload,"{\"status\":\"online\"}");
|
|
|
|
MQTTSend(topic, payload, strlen(payload));
|
|
MQTTLastStatus = time(NULL);
|
|
}
|
|
|
|
void MQTTTimer()
|
|
{
|
|
if (MQTT_Connecting == 0 && MQTT_Connected == 0)
|
|
MQTTConnect(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS);
|
|
|
|
if ((time(NULL) - MQTTLastStatus) > 1800)
|
|
MQTTSendStatus();
|
|
|
|
}
|
|
|
|
|
|
void MQTTDisconnect()
|
|
{
|
|
if (MQTT_Connected)
|
|
{
|
|
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
|
|
|
|
MQTTAsync_disconnect(client, &disc_opts);
|
|
|
|
MQTT_Connecting = MQTT_Connected = 0;
|
|
|
|
// Try to recconect. If it fails system will rety every minute
|
|
|
|
MQTTConnect(MQTT_HOST, MQTT_PORT, MQTT_USER, MQTT_PASS);
|
|
}
|
|
}
|
|
|
|
DllExport int APIENTRY MQTTSend(char * topic, char * Msg, int MsgLen)
|
|
{
|
|
int rc;
|
|
|
|
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
|
|
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
|
|
|
|
pubmsg.payload = Msg;
|
|
pubmsg.payloadlen = MsgLen;
|
|
rc = MQTTAsync_sendMessage(client, topic, &pubmsg, &opts);
|
|
|
|
if (rc)
|
|
MQTTDisconnect();
|
|
|
|
return rc;
|
|
}
|
|
|
|
|
|
|
|
void onConnect(void* context, MQTTAsync_successData* response)
|
|
{
|
|
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
|
|
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
|
|
|
|
MQTT_Connecting = 0;
|
|
MQTT_Connected = 1;
|
|
|
|
printf("Successful MQTT connection\n");
|
|
|
|
// Send start up message
|
|
|
|
MQTTSendStatus();
|
|
|
|
}
|
|
|
|
void onConnectFailure(void* context, MQTTAsync_failureData* response)
|
|
{
|
|
printf("MQTT connection failed, rc %d\n", response ? response->code : 0);
|
|
MQTT_Connecting = 0;
|
|
}
|
|
|
|
|
|
|
|
char* jsonEncodeMessage(MESSAGE *msg)
|
|
{
|
|
char From[10];
|
|
char To[10];
|
|
|
|
char buffer[1024];
|
|
unsigned long long SaveMMASK = MMASK;
|
|
BOOL SaveMTX = MTX;
|
|
BOOL SaveMCOM = MCOM;
|
|
BOOL SaveMUI = MUIONLY;
|
|
int len;
|
|
char *msg_str;
|
|
char payload_timestamp[16];
|
|
|
|
struct tm * TM = localtime(&msg->Timestamp);
|
|
sprintf(payload_timestamp, "%02d:%02d:%02d", TM->tm_hour, TM->tm_min, TM->tm_sec);
|
|
|
|
|
|
IntSetTraceOptionsEx(MMASK, TRUE, TRUE, FALSE);
|
|
From[ConvFromAX25(msg->ORIGIN, From)] = 0;
|
|
To[ConvFromAX25(msg->DEST, To)] = 0;
|
|
|
|
len = IntDecodeFrame(msg, buffer, msg->Timestamp, 0xffffffffffffffff, FALSE, FALSE);
|
|
IntSetTraceOptionsEx(SaveMMASK, SaveMTX, SaveMCOM, SaveMUI);
|
|
|
|
buffer[len] = 0;
|
|
|
|
#ifdef WIN32
|
|
|
|
msg_str = zalloc(2048);
|
|
|
|
sprintf(msg_str, "{\"from\": \"%s\", \"to\": \"%s\", \"payload\": \"%s\", \"port\": %d, \"timestamp\": \"%s\"}",
|
|
From, To, buffer, msg->PORT, payload_timestamp);
|
|
|
|
#else
|
|
|
|
json_t *root;
|
|
|
|
root = json_object();
|
|
|
|
json_object_set_new(root, "from", json_string(From));
|
|
json_object_set_new(root, "to", json_string(To));
|
|
|
|
|
|
json_object_set_new(root, "payload", json_string(buffer));
|
|
json_object_set_new(root, "port", json_integer(msg->PORT));
|
|
sprintf(payload_timestamp, "%02d:%02d:%02d", TM->tm_hour, TM->tm_min, TM->tm_sec);
|
|
json_object_set_new(root, "timestamp", json_string(payload_timestamp));
|
|
msg_str = json_dumps(root, 0);
|
|
json_decref(root);
|
|
|
|
#endif
|
|
|
|
return msg_str;
|
|
}
|
|
|
|
void MQTTKISSTX(void *message)
|
|
{
|
|
MESSAGE *msg = (MESSAGE *)message;
|
|
char topic[256];
|
|
char *msg_str;
|
|
|
|
sprintf(topic, "PACKETNODE/ax25/trace/bpqformat/%s/sent/%d", NODECALLLOPPED, msg->PORT);
|
|
|
|
msg_str = jsonEncodeMessage(msg);
|
|
|
|
MQTTSend(topic, msg_str, strlen(msg_str));
|
|
|
|
free(msg_str);
|
|
}
|
|
|
|
void MQTTKISSTX_RAW(char* buffer, int bufferLength, void* PORT)
|
|
{
|
|
PPORTCONTROL PPORT = (PPORTCONTROL)PORT;
|
|
char topic[256];
|
|
|
|
sprintf(topic, "PACKETNODE/kiss/%s/sent/%d", NODECALLLOPPED, PPORT->PORTNUMBER);
|
|
|
|
MQTTSend(topic, buffer, bufferLength);
|
|
}
|
|
|
|
|
|
void MQTTKISSRX(void *message)
|
|
{
|
|
MESSAGE *msg = (MESSAGE *)message;
|
|
char topic[256];
|
|
char *msg_str;
|
|
|
|
|
|
sprintf(topic, "PACKETNODE/ax25/trace/bpqformat/%s/rcvd/%d", NODECALLLOPPED, msg->PORT);
|
|
msg_str = jsonEncodeMessage(msg);
|
|
|
|
MQTTSend(topic, msg_str, strlen(msg_str));
|
|
|
|
free(msg_str);
|
|
}
|
|
|
|
void MQTTKISSRX_RAW(char* buffer, int bufferLength, void* PORT)
|
|
{
|
|
PPORTCONTROL PPORT = (PPORTCONTROL)PORT;
|
|
char topic[256];
|
|
|
|
sprintf(topic, "PACKETNODE/kiss/%s/rcvd/%d", NODECALLLOPPED, PPORT->PORTNUMBER);
|
|
|
|
MQTTSend(topic, buffer, bufferLength);
|
|
|
|
}
|
|
|
|
void MQTTReportSession(char * Msg)
|
|
{
|
|
char topic[256];
|
|
sprintf(topic, "PACKETNODE/stats/session/%s", NODECALLLOPPED);
|
|
|
|
MQTTSend(topic, Msg, strlen(Msg));
|
|
}
|
|
|
|
|
|
char* replace(char* str, char* a, char* b)
|
|
{
|
|
int len = strlen(str);
|
|
int lena = strlen(a), lenb = strlen(b);
|
|
char * p;
|
|
|
|
for (p = str; p = strstr(p, a); p) {
|
|
if (lena != lenb) // shift end as needed
|
|
memmove(p + lenb, p + lena,
|
|
len - (p - str) + lenb);
|
|
memcpy(p, b, lenb);
|
|
}
|
|
return str;
|
|
}
|
|
|
|
int MQTTPublish(void *message, char *topic)
|
|
{
|
|
MESSAGE *msg = (MESSAGE *)message;
|
|
char From[10];
|
|
char To[10];
|
|
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
|
|
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
|
|
|
|
unsigned long long SaveMMASK = MMASK;
|
|
BOOL SaveMTX = MTX;
|
|
BOOL SaveMCOM = MCOM;
|
|
BOOL SaveMUI = MUIONLY;
|
|
int len;
|
|
char* replaced_buffer;
|
|
char buffer[1024];
|
|
|
|
time_t timestamp = msg->Timestamp;
|
|
|
|
|
|
From[ConvFromAX25(msg->ORIGIN, From)] = 0;
|
|
To[ConvFromAX25(msg->DEST, To)] = 0;
|
|
|
|
|
|
IntSetTraceOptionsEx(8, TRUE, TRUE, FALSE);
|
|
len = IntDecodeFrame(msg, buffer, timestamp, 1, FALSE, FALSE);
|
|
IntSetTraceOptionsEx(SaveMMASK, SaveMTX, SaveMCOM, SaveMUI);
|
|
|
|
// MQTT _really_ doesn't like \r, so replace it with something
|
|
// that is at least human readable
|
|
|
|
replaced_buffer = replace(buffer, "\r", "\r\n");
|
|
|
|
pubmsg.payload = replaced_buffer;
|
|
pubmsg.payloadlen = strlen(replaced_buffer);
|
|
|
|
printf("%s\n", replaced_buffer);
|
|
|
|
return MQTTAsync_sendMessage(client, topic, &pubmsg, &opts);
|
|
}
|
|
|
|
int MQTTConnect(char* host, int port, char* user, char* pass)
|
|
{
|
|
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
|
|
int rc;
|
|
char hostString[256];
|
|
|
|
sprintf(hostString, "tcp://%s:%d", host, port);
|
|
|
|
printf("MQTT Connect to %s\n", hostString);
|
|
|
|
rc = MQTTAsync_create(&client, hostString, NODECALLLOPPED, MQTTCLIENT_PERSISTENCE_NONE, NULL);
|
|
|
|
if (rc != MQTTASYNC_SUCCESS)
|
|
{
|
|
printf("Failed to create client, return code %d\n", rc);
|
|
return rc;
|
|
}
|
|
|
|
conn_opts.keepAliveInterval = 20;
|
|
conn_opts.cleansession = 1;
|
|
conn_opts.username = user;
|
|
conn_opts.password = pass;
|
|
conn_opts.onSuccess = onConnect;
|
|
conn_opts.onFailure = onConnectFailure;
|
|
// conn_opts.automaticReconnect = 1;
|
|
// conn_opts.minRetryInterval = 30;
|
|
// conn_opts.maxRetryInterval = 300;
|
|
|
|
rc = MQTTAsync_connect(client, &conn_opts);
|
|
|
|
if (rc != MQTTASYNC_SUCCESS)
|
|
{
|
|
printf("Failed to start connect, return code %d\n", rc);
|
|
return rc;
|
|
}
|
|
|
|
MQTT_Connecting = 1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Message Database Entry. Designed to be compatible with FBB
|
|
|
|
#define NBBBS 160 // Max BBSes we can forward to. Must be Multiple of 8, and must be 80 for FBB compatibliliy
|
|
#define NBMASK NBBBS/8 // Number of bytes in Forward bitlists.
|
|
|
|
#pragma pack(1)
|
|
|
|
struct MsgInfo
|
|
{
|
|
char type;
|
|
char status;
|
|
int number;
|
|
int length;
|
|
int xdatereceived;
|
|
char bbsfrom[7]; // ? BBS we got it from ?
|
|
char via[41];
|
|
char from[7];
|
|
char to[7];
|
|
char bid[13];
|
|
char title[61];
|
|
int nntpnum; // Number within topic (ie Bull TO Addr) - used for nntp
|
|
|
|
UCHAR B2Flags; // Not all flags specific to B2
|
|
|
|
#define B2Msg 1 // Set if Message File is a formatted B2 message
|
|
#define Attachments 2 // Set if B2 message has attachments
|
|
#define FromPaclink 4
|
|
#define FromCMS 8
|
|
#define FromRMSExpress 16
|
|
#define RadioOnlyMsg 32 // Received using call-T
|
|
#define RadioOnlyFwd 64 // Received using call-R
|
|
#define WarnNotForwardedSent 128
|
|
|
|
int xdatecreated;
|
|
int xdatechanged;
|
|
UCHAR fbbs[NBMASK];
|
|
UCHAR forw[NBMASK];
|
|
char emailfrom[41];
|
|
char Locked; // Set if selected for sending (NTS Pickup)
|
|
char Defered; // FBB response '=' received
|
|
UCHAR UTF8; // Set if Message is in UTF8 (ie from POP/SMTP)
|
|
|
|
// For 64 bit time_t compatibility define as long long
|
|
// (so struct is same with 32 or 64 bit time_t)
|
|
|
|
int64_t datereceived;
|
|
int64_t datecreated;
|
|
int64_t datechanged;
|
|
|
|
char Spare[61 - 24]; // For future use
|
|
} ;
|
|
|
|
#pragma pack()
|
|
|
|
void MQTTMessageEvent(void* message)
|
|
{
|
|
struct MsgInfo* msg = (struct MsgInfo *)message;
|
|
char *msg_str;
|
|
char * ptr;
|
|
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
|
|
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
|
|
char topic[256];
|
|
|
|
json_t *root = json_object();
|
|
json_object_set_new(root, "id", json_integer(msg->number));
|
|
json_object_set_new(root, "size", json_integer(msg->length));
|
|
json_object_set_new(root, "type", json_string(msg->type == 'P' ? "P" : "B"));
|
|
json_object_set_new(root, "to", json_string(msg->to));
|
|
json_object_set_new(root, "from", json_string(msg->from));
|
|
json_object_set_new(root, "subj", json_string(msg->title));
|
|
|
|
switch(msg->status) {
|
|
case 'N':
|
|
json_object_set_new(root, "event", json_string("newmsg"));
|
|
break;
|
|
case 'F':
|
|
json_object_set_new(root, "event", json_string("fwded"));
|
|
break;
|
|
case 'R':
|
|
json_object_set_new(root, "event", json_string("read"));
|
|
break;
|
|
case 'K':
|
|
json_object_set_new(root, "event", json_string("killed"));
|
|
break;
|
|
}
|
|
|
|
msg_str = json_dumps(root, 0);
|
|
|
|
pubmsg.payload = msg_str;
|
|
pubmsg.payloadlen = strlen(msg_str);
|
|
|
|
|
|
sprintf(topic, "PACKETNODE/event/%s/pmsg", NODECALLLOPPED);
|
|
|
|
MQTTAsync_sendMessage(client, topic, &pubmsg, &opts);
|
|
}
|
|
|
|
#else
|
|
|
|
// Dummies ofr build without MQTT libraries
|
|
|
|
int MQTTConnect(char* host, int port, char* user, char* pass)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
void MQTTKISSTX(void *message) {};
|
|
void MQTTKISSTX_RAW(char* buffer, int bufferLength, void* PORT) {};
|
|
void MQTTKISSRX(void *message) {};
|
|
void MQTTKISSRX_RAW(char* buffer, int bufferLength, void* PORT) {};
|
|
void MQTTTimer() {};
|
|
void MQTTReportSession(char * Msg) {};
|
|
|
|
#endif
|
|
|