From 328ebe7dd2d178d1bae5f30052dfa4350bfc966b Mon Sep 17 00:00:00 2001 From: Tobias Brunner Date: Mon, 18 Oct 2021 07:47:23 +0200 Subject: [PATCH] adapt to ttnv3 and black --- lgt92.py | 155 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 92 insertions(+), 63 deletions(-) diff --git a/lgt92.py b/lgt92.py index 53cff76..67435b5 100644 --- a/lgt92.py +++ b/lgt92.py @@ -22,37 +22,47 @@ DST_INFLUX_DB = os.getenv("DST_INFLUX_DB") HC_PING_URL = os.getenv("HC_PING_URL") VERSION = "v2.0" -OT_TOPIC="owntracks/tobru/dragino" -OT_TID="dragino" +OT_TOPIC = "owntracks/tobru/dragino" +OT_TID = "dragino" + def on_connect_ttn(client, userdata, flags, rc): logging.info("connected to ttn %s - %s", SRC_MQTT_HOST, str(rc)) client.subscribe("v3/+/devices/+/up") + def on_connect_ot(client, userdata, flags, rc): logging.info("connected to ot %s - %s", DST_MQTT_HOST, str(rc)) + def on_publish_ot(client, userdata, rc): logging.info("published data to ot") + def on_log(client, userdata, level, buf): logging_level = mqtt.LOGGING_LEVEL[level] logging.log(logging_level, buf) - #logging.info("got a log message level %s: %s", level, str(buf)) + # logging.info("got a log message level %s: %s", level, str(buf)) if "PINGRESP" in str(buf): # report to https://healthchecks.io to tell that the connection is alive requests.get(HC_PING_URL) + # The callback for when a PUBLISH message is received from the server. def on_message_ttn(client, userdata, msg): data = json.loads(msg.payload) - logging.info("message from ttn received for %s - #%s", data["dev_id"], data["counter"]) + logging.info( + "message from ttn received for %s", data["end_device_ids"]["device_id"] + ) # retrieve info about gateway - gtw_id = data["metadata"]["gateways"][0]["gtw_id"] + gtw_id = data["rx_metadata"][0]["gateway_ids"]["gateway_id"] try: - gtw_info = requests.get("https://www.thethingsnetwork.org/gateway-data/gateway/"+gtw_id).json() - logging.info("received via gw %s, %s, owned by %s", + gtw_info = requests.get( + "https://www.thethingsnetwork.org/gateway-data/gateway/" + gtw_id + ).json() + logging.info( + "received via gw %s, %s, owned by %s", data["metadata"]["gateways"][0]["gtw_id"], gtw_info[gtw_id]["description"], gtw_info[gtw_id]["owner"], @@ -61,38 +71,47 @@ def on_message_ttn(client, userdata, msg): logging.info("received via gw %s", gtw_id) # max is 4 volts, 3 volts is considered empty - batpercent = round((data["payload_fields"]["BatV"] - 3) * 100) + batpercent = round((data["uplink_message"]["decoded_payload"]["BatV"] - 3) * 100) - if data["payload_fields"]["ALARM_status"]: + if data["uplink_message"]["decoded_payload"]["ALARM_status"]: logging.info("Red button pushed!") - logging.info("Motion detection: %s", data["payload_fields"]["MD"]) - logging.info("LED status for position: %s", data["payload_fields"]["LON"]) - logging.info("Firmware version: %s", data["payload_fields"]["FW"]) + logging.info( + "Motion detection: %s", data["uplink_message"]["decoded_payload"]["MD"] + ) + logging.info( + "LED status for position: %s", data["uplink_message"]["decoded_payload"]["LON"] + ) + logging.info( + "Firmware version: %s", data["uplink_message"]["decoded_payload"]["FW"] + ) got_fix = False - if data["payload_fields"]["Latitude"] == 0: + if data["uplink_message"]["decoded_payload"]["Latitude"] == 0: logging.info("no GPS data (Latitude) present") # set GPS data to 0 for InfluxDB - data["payload_fields"]["Latitude"] = 0.0 - data["payload_fields"]["Longitude"] = 0.0 + data["uplink_message"]["decoded_payload"]["Latitude"] = 0.0 + data["uplink_message"]["decoded_payload"]["Longitude"] = 0.0 else: - logging.info("GPS data (Latitude) present: lat %s, lon %s", - data["payload_fields"]["Latitude"], - data["payload_fields"]["Longitude"] + logging.info( + "GPS data (Latitude) present: lat %s, lon %s", + data["uplink_message"]["decoded_payload"]["Latitude"], + data["uplink_message"]["decoded_payload"]["Longitude"], ) got_fix = True # transform received data into OwnTracks format - ot_data = json.dumps({ - "_type": "location", - "lat": data["payload_fields"]["Latitude"], - "lon": data["payload_fields"]["Longitude"], - "batt": batpercent, - "t": "p", - "tid": OT_TID, - "tst": int(datetime.timestamp(datetime.now())), - "conn": "m", - }) + ot_data = json.dumps( + { + "_type": "location", + "lat": data["uplink_message"]["decoded_payload"]["Latitude"], + "lon": data["uplink_message"]["decoded_payload"]["Longitude"], + "batt": batpercent, + "t": "p", + "tid": OT_TID, + "tst": int(datetime.timestamp(datetime.now())), + "conn": "m", + } + ) # publish to owntracks logging.info("publishing data to owntracks via mqtt to topic %s", OT_TOPIC) @@ -101,28 +120,35 @@ def on_message_ttn(client, userdata, msg): # write to influxdb logging.info("writing data to influxdb") influxdb.write_points( - [{ - "measurement": "dragino", - "tags": { - "device": "lgt92", - }, - "fields": { - "bat": float(data["payload_fields"]["BatV"]), - "pitch": float(data["payload_fields"]["Pitch"]), - "roll": float(data["payload_fields"]["Roll"]), - "lat": float(data["payload_fields"]["Latitude"]), - "lon": float(data["payload_fields"]["Longitude"]), - "alarm": int(data["payload_fields"]["ALARM_status"]), - "counter": data["counter"], - "airtime": data["metadata"]["airtime"], - "rssi": data["metadata"]["gateways"][0]["rssi"], - "fix": got_fix, + [ + { + "measurement": "dragino", + "tags": { + "device": "lgt92", + }, + "fields": { + "bat": float(data["uplink_message"]["decoded_payload"]["BatV"]), + "pitch": float(data["uplink_message"]["decoded_payload"]["Pitch"]), + "roll": float(data["uplink_message"]["decoded_payload"]["Roll"]), + "lat": float(data["uplink_message"]["decoded_payload"]["Latitude"]), + "lon": float( + data["uplink_message"]["decoded_payload"]["Longitude"] + ), + "alarm": int( + data["uplink_message"]["decoded_payload"]["ALARM_status"] + ), + "counter": data["counter"], + "airtime": data["metadata"]["airtime"], + "rssi": data["metadata"]["gateways"][0]["rssi"], + "fix": got_fix, + }, } - }] + ] ) logging.info("data processing done") + def shutdown(): logging.info("disconnecting from mqtt") client_ot.disconnect() @@ -130,32 +156,34 @@ def shutdown(): client_ttn.disconnect() client_ttn.loop_stop() + def handleSIGTERM(signalNumber, frame): logging.info("got SIGTERM") shutdown() return -if __name__ == '__main__': + +if __name__ == "__main__": signal.signal(signal.SIGTERM, handleSIGTERM) logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S %Z' + format="%(asctime)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S %Z", ) - logging.info("Starting ioteer lgt92. "+VERSION) + logging.info("Starting ioteer lgt92. " + VERSION) # Prepare InfluxDB influxdb = InfluxDBClient( - host=DST_INFLUX_HOST, - port=443, - database=DST_INFLUX_DB, - username=DST_INFLUX_USER, - password=DST_INFLUX_PASS, - ssl=True, - verify_ssl=True, + host=DST_INFLUX_HOST, + port=443, + database=DST_INFLUX_DB, + username=DST_INFLUX_USER, + password=DST_INFLUX_PASS, + ssl=True, + verify_ssl=True, ) # Prepare MQTT for The Things Network @@ -164,21 +192,23 @@ if __name__ == '__main__': client_ttn.on_connect = on_connect_ttn client_ttn.on_message = on_message_ttn client_ttn.on_log = on_log - client_ttn.username_pw_set(SRC_MQTT_USER,SRC_MQTT_PASS) + client_ttn.username_pw_set(SRC_MQTT_USER, SRC_MQTT_PASS) client_ttn.tls_set() client_ttn.connect(SRC_MQTT_HOST, 8883, 60) # Prepare MQTT for OwnTracks - ot_lwt = json.dumps({ - "_type": "lwt", - "tst": int(datetime.timestamp(datetime.now())), - }) + ot_lwt = json.dumps( + { + "_type": "lwt", + "tst": int(datetime.timestamp(datetime.now())), + } + ) client_ot = mqtt.Client() client_ot.enable_logger() client_ot.on_connect = on_connect_ot client_ot.on_publish = on_publish_ot client_ot.on_log = on_log - client_ot.username_pw_set(DST_MQTT_USER,DST_MQTT_PASS) + client_ot.username_pw_set(DST_MQTT_USER, DST_MQTT_PASS) client_ot.tls_set() client_ot.will_set(OT_TOPIC, payload=ot_lwt, qos=1, retain=True) client_ot.connect(DST_MQTT_HOST, 8883, 60) @@ -190,4 +220,3 @@ if __name__ == '__main__': except KeyboardInterrupt: shutdown() logging.info("tschuess") -