import paho.mqtt.client as mqtt import json import os import logging import signal import requests from datetime import datetime from influxdb import InfluxDBClient from dotenv import find_dotenv, load_dotenv load_dotenv(find_dotenv()) SRC_MQTT_HOST = os.getenv("SRC_MQTT_HOST") SRC_MQTT_USER = os.getenv("SRC_MQTT_USER") SRC_MQTT_PASS = os.getenv("SRC_MQTT_PASS") DST_MQTT_HOST = os.getenv("DST_MQTT_HOST") DST_MQTT_USER = os.getenv("DST_MQTT_USER") DST_MQTT_PASS = os.getenv("DST_MQTT_PASS") DST_INFLUX_HOST = os.getenv("DST_INFLUX_HOST") DST_INFLUX_USER = os.getenv("DST_INFLUX_USER") DST_INFLUX_PASS = os.getenv("DST_INFLUX_PASS") DST_INFLUX_DB = os.getenv("DST_INFLUX_DB") DST_TRACCAR_URL = os.getenv("DST_TRACCAR_URL") HC_PING_URL = os.getenv("HC_PING_URL") VERSION = "v3.1" OT_TOPIC = "owntracks/tobru/dragino" OT_TID = "dragino" def on_connect_ttn(client, userdata, flags, rc): logging.info("connected to ttn %s - %s", SRC_MQTT_HOST, str(rc)) client.subscribe("v3/+/devices/+/up") def on_connect_ot(client, userdata, flags, rc): logging.info("connected to ot %s - %s", DST_MQTT_HOST, str(rc)) def on_publish_ot(client, userdata, rc): logging.info("published data to ot") def on_log(client, userdata, level, buf): logging_level = mqtt.LOGGING_LEVEL[level] logging.log(logging_level, buf) # logging.info("got a log message level %s: %s", level, str(buf)) if "PINGRESP" in str(buf): # report to https://healthchecks.io to tell that the connection is alive requests.get(HC_PING_URL) # The callback for when a PUBLISH message is received from the server. def on_message_ttn(client, userdata, msg): data = json.loads(msg.payload) logging.info( "message from ttn received for %s via %s", data["end_device_ids"]["device_id"], data["uplink_message"]["network_ids"]["cluster_id"], ) # retrieve info about gateway gtw_id = data["uplink_message"]["rx_metadata"][0]["gateway_ids"]["gateway_id"] try: gtw_info = requests.get( "https://www.thethingsnetwork.org/gateway-data/gateway/" + gtw_id ).json() logging.info( "received via gw %s, %s, owned by %s", data["metadata"]["gateways"][0]["gtw_id"], gtw_info[gtw_id]["description"], gtw_info[gtw_id]["owner"], ) except: logging.info("received via gw %s", gtw_id) # the decoded data bat_v = data["uplink_message"]["decoded_payload"]["BatV"] motion_detection = data["uplink_message"]["decoded_payload"]["MD"] led_status = data["uplink_message"]["decoded_payload"]["LON"] fw_version = data["uplink_message"]["decoded_payload"]["FW"] pitch = data["uplink_message"]["decoded_payload"]["Pitch"] roll = data["uplink_message"]["decoded_payload"]["Roll"] latitude = data["uplink_message"]["decoded_payload"]["Latitude"] longitude = data["uplink_message"]["decoded_payload"]["Longitude"] altitude = data["uplink_message"]["decoded_payload"]["Altitude"] if data["uplink_message"]["decoded_payload"]["HDOP"]: hdop = data["uplink_message"]["decoded_payload"]["HDOP"] else: hdop = 0 rssi = data["uplink_message"]["rx_metadata"][0]["rssi"] airtime = data["uplink_message"]["consumed_airtime"][:-1] timestamp = int(datetime.timestamp(datetime.now())) # max is 4 volts, 3 volts is considered empty batpercent = round((bat_v - 3) * 100) alarm_status = False if data["uplink_message"]["decoded_payload"]["ALARM_status"] == "TRUE": alarm_status = True logging.info("Red button pushed!") logging.info("Motion detection: %s", motion_detection) logging.info("LED status for position: %s", led_status) logging.info("Firmware version: %s", fw_version) got_fix = False if latitude == 0: logging.info("no GPS data (Latitude) present") # set GPS data to 0 for InfluxDB latitude = 0.0 longitude = 0.0 else: logging.info( "GPS data (Latitude) present: lat %s, lon %s", latitude, longitude, ) got_fix = True # transform received data into OwnTracks format ot_data = json.dumps( { "_type": "location", "lat": latitude, "lon": longitude, "batt": batpercent, "t": "p", "tid": OT_TID, "tst": timestamp, "conn": "m", } ) # publish to owntracks logging.info("publishing data to owntracks via mqtt to topic %s", OT_TOPIC) client_ot.publish(OT_TOPIC, payload=ot_data, retain=True, qos=1) # send to traccar logging.info("publishing data to traccar") traccar_url = f"{DST_TRACCAR_URL}/?id={OT_TID}&lat={latitude}&lon={longitude}×tamp={timestamp}&hdop={hdop}&altitude={altitude}&speed=0" requests.get(traccar_url) # write to influxdb logging.info("writing data to influxdb") influxdb_points = [ { "measurement": "dragino", "tags": { "device": "lgt92", }, "fields": { "bat": float(bat_v), "pitch": float(pitch), "roll": float(roll), "lat": float(latitude), "lon": float(longitude), "alarm": int(alarm_status), "airtime": float(airtime), "rssi": rssi, "fix": got_fix, }, } ] influxdb.write_points(influxdb_points) logging.info("data processing done") def shutdown(): logging.info("disconnecting from mqtt") client_ot.disconnect() client_ot.loop_stop() client_ttn.disconnect() client_ttn.loop_stop() def handleSIGTERM(signalNumber, frame): logging.info("got SIGTERM") shutdown() return if __name__ == "__main__": signal.signal(signal.SIGTERM, handleSIGTERM) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S %Z", ) logging.info("Starting ioteer lgt92. " + VERSION) # Prepare InfluxDB influxdb = InfluxDBClient( host=DST_INFLUX_HOST, port=443, database=DST_INFLUX_DB, username=DST_INFLUX_USER, password=DST_INFLUX_PASS, ssl=True, verify_ssl=True, ) # Prepare MQTT for The Things Network client_ttn = mqtt.Client() client_ttn.enable_logger() client_ttn.on_connect = on_connect_ttn client_ttn.on_message = on_message_ttn client_ttn.on_log = on_log client_ttn.username_pw_set(SRC_MQTT_USER, SRC_MQTT_PASS) client_ttn.tls_set() client_ttn.connect(SRC_MQTT_HOST, 8883, 60) # Prepare MQTT for OwnTracks ot_lwt = json.dumps( { "_type": "lwt", "tst": int(datetime.timestamp(datetime.now())), } ) client_ot = mqtt.Client() client_ot.enable_logger() client_ot.on_connect = on_connect_ot client_ot.on_publish = on_publish_ot client_ot.on_log = on_log client_ot.username_pw_set(DST_MQTT_USER, DST_MQTT_PASS) client_ot.tls_set() client_ot.will_set(OT_TOPIC, payload=ot_lwt, qos=1, retain=True) client_ot.connect(DST_MQTT_HOST, 8883, 60) try: # Connect to MQTT and react to messages client_ot.loop_start() client_ttn.loop_forever() except KeyboardInterrupt: shutdown() logging.info("tschuess")