import paho.mqtt.client as mqtt import json import os import logging from datetime import datetime from influxdb import InfluxDBClient from dotenv import find_dotenv, load_dotenv load_dotenv(find_dotenv()) SRC_MQTT_HOST = os.getenv("SRC_MQTT_HOST") SRC_MQTT_USER = os.getenv("SRC_MQTT_USER") SRC_MQTT_PASS = os.getenv("SRC_MQTT_PASS") DST_MQTT_HOST = os.getenv("DST_MQTT_HOST") DST_MQTT_USER = os.getenv("DST_MQTT_USER") DST_MQTT_PASS = os.getenv("DST_MQTT_PASS") DST_INFLUX_HOST = os.getenv("DST_INFLUX_HOST") DST_INFLUX_USER = os.getenv("DST_INFLUX_USER") DST_INFLUX_PASS = os.getenv("DST_INFLUX_PASS") DST_INFLUX_DB = os.getenv("DST_INFLUX_DB") OT_TOPIC="owntracks/tobru/dragino" OT_TID="dragino" def on_connect_ttn(client, userdata, flags, rc): logging.info("connected to ttn %s - %s", SRC_MQTT_HOST, str(rc)) client.subscribe("+/devices/+/up") def on_connect_ot(client, userdata, flags, rc): logging.info("connected to ot %s - %s", DST_MQTT_HOST, str(rc)) def on_publish_ot(client, userdata, rc): logging.info("published data to ot") def on_log(client, userdata, level, buf): logging_level = mqtt.LOGGING_LEVEL[level] logging.log(logging_level, buf) logging.info("got a log message for %s level %s: %s", str(userdata), level, str(buf)) # The callback for when a PUBLISH message is received from the server. def on_message_ttn(client, userdata, msg): data = json.loads(msg.payload) logging.info("message from ttn received for %s - #%s", data["dev_id"], data["counter"]) logging.info("received via gw %s", data["metadata"]["gateways"][0]["gtw_id"]) if "latitude" in data["payload_fields"]: # max is 4 volts, 3 volts is considered empty batpercent = round((data["payload_fields"]["batV"] - 3) * 100) # transform received data into OwnTracks format ot_data = json.dumps({ "_type": "location", "lat": data["payload_fields"]["latitude"], "lon": data["payload_fields"]["longitude"], "batt": batpercent, "t": "p", "tid": OT_TID, "tst": int(datetime.timestamp(datetime.now())), "conn": "m", }) # publish to owntracks logging.info("publishing data to owntracks") client_ot.publish(OT_TOPIC,ot_data) # write to influxdb logging.info("writing data to influxdb") influxdb.write_points( [ { "measurement": "dragino", "tags": { "device": "lgt92", }, "fields": { "bat": data["payload_fields"]["batV"], "pitch": data["payload_fields"]["pitch"], "roll": data["payload_fields"]["roll"], "lat": data["payload_fields"]["latitude"], "lon": data["payload_fields"]["longitude"], "alarm": int(data["payload_fields"]["alarm"]), "counter": data["counter"], "airtime": data["metadata"]["airtime"], } } ] ) else: logging.info("no GPS data present - skipping") logging.info("data processing done") logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S %Z' ) # Prepare InfluxDB influxdb = InfluxDBClient( host=DST_INFLUX_HOST, port=443, database=DST_INFLUX_DB, username=DST_INFLUX_USER, password=DST_INFLUX_PASS, ssl=True, verify_ssl=True, ) # Prepare MQTT client_ttn = mqtt.Client() client_ttn.enable_logger() client_ttn.on_connect = on_connect_ttn client_ttn.on_message = on_message_ttn client_ttn.on_log = on_log client_ttn.username_pw_set(SRC_MQTT_USER,SRC_MQTT_PASS) client_ttn.tls_set() client_ttn.connect(SRC_MQTT_HOST, 8883, 60) client_ot = mqtt.Client() client_ot.enable_logger() client_ot.on_connect = on_connect_ot client_ot.on_publish = on_publish_ot client_ot.on_log = on_log client_ot.username_pw_set(DST_MQTT_USER,DST_MQTT_PASS) client_ot.tls_set() client_ot.connect(DST_MQTT_HOST, 8883, 60) try: # Connect to MQTT and react to messages client_ot.loop_start() client_ttn.loop_forever() except KeyboardInterrupt: client_ot.disconnect() client_ot.loop_stop() client_ttn.disconnect() client_ttn.loop_stop() logging.info("tschuess")