You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
238 lines
7.4 KiB
238 lines
7.4 KiB
import paho.mqtt.client as mqtt |
|
import json |
|
import os |
|
import logging |
|
import signal |
|
import requests |
|
from datetime import datetime |
|
from influxdb import InfluxDBClient |
|
from dotenv import find_dotenv, load_dotenv |
|
|
|
load_dotenv(find_dotenv()) |
|
SRC_MQTT_HOST = os.getenv("SRC_MQTT_HOST") |
|
SRC_MQTT_USER = os.getenv("SRC_MQTT_USER") |
|
SRC_MQTT_PASS = os.getenv("SRC_MQTT_PASS") |
|
DST_MQTT_HOST = os.getenv("DST_MQTT_HOST") |
|
DST_MQTT_USER = os.getenv("DST_MQTT_USER") |
|
DST_MQTT_PASS = os.getenv("DST_MQTT_PASS") |
|
DST_INFLUX_HOST = os.getenv("DST_INFLUX_HOST") |
|
DST_INFLUX_USER = os.getenv("DST_INFLUX_USER") |
|
DST_INFLUX_PASS = os.getenv("DST_INFLUX_PASS") |
|
DST_INFLUX_DB = os.getenv("DST_INFLUX_DB") |
|
DST_TRACCAR_URL = os.getenv("DST_TRACCAR_URL") |
|
HC_PING_URL = os.getenv("HC_PING_URL") |
|
VERSION = "v3.1" |
|
|
|
OT_TOPIC = "owntracks/tobru/dragino" |
|
OT_TID = "dragino" |
|
|
|
|
|
def on_connect_ttn(client, userdata, flags, rc): |
|
logging.info("connected to ttn %s - %s", SRC_MQTT_HOST, str(rc)) |
|
client.subscribe("v3/+/devices/+/up") |
|
|
|
|
|
def on_connect_ot(client, userdata, flags, rc): |
|
logging.info("connected to ot %s - %s", DST_MQTT_HOST, str(rc)) |
|
|
|
|
|
def on_publish_ot(client, userdata, rc): |
|
logging.info("published data to ot") |
|
|
|
|
|
def on_log(client, userdata, level, buf): |
|
logging_level = mqtt.LOGGING_LEVEL[level] |
|
logging.log(logging_level, buf) |
|
# logging.info("got a log message level %s: %s", level, str(buf)) |
|
if "PINGRESP" in str(buf): |
|
# report to https://healthchecks.io to tell that the connection is alive |
|
requests.get(HC_PING_URL) |
|
|
|
|
|
# The callback for when a PUBLISH message is received from the server. |
|
def on_message_ttn(client, userdata, msg): |
|
data = json.loads(msg.payload) |
|
logging.info( |
|
"message from ttn received for %s via %s", |
|
data["end_device_ids"]["device_id"], |
|
data["uplink_message"]["network_ids"]["cluster_id"], |
|
) |
|
|
|
# retrieve info about gateway |
|
gtw_id = data["uplink_message"]["rx_metadata"][0]["gateway_ids"]["gateway_id"] |
|
try: |
|
gtw_info = requests.get( |
|
"https://www.thethingsnetwork.org/gateway-data/gateway/" + gtw_id |
|
).json() |
|
logging.info( |
|
"received via gw %s, %s, owned by %s", |
|
data["metadata"]["gateways"][0]["gtw_id"], |
|
gtw_info[gtw_id]["description"], |
|
gtw_info[gtw_id]["owner"], |
|
) |
|
except: |
|
logging.info("received via gw %s", gtw_id) |
|
|
|
# the decoded data |
|
bat_v = data["uplink_message"]["decoded_payload"]["BatV"] |
|
motion_detection = data["uplink_message"]["decoded_payload"]["MD"] |
|
led_status = data["uplink_message"]["decoded_payload"]["LON"] |
|
fw_version = data["uplink_message"]["decoded_payload"]["FW"] |
|
pitch = data["uplink_message"]["decoded_payload"]["Pitch"] |
|
roll = data["uplink_message"]["decoded_payload"]["Roll"] |
|
latitude = data["uplink_message"]["decoded_payload"]["Latitude"] |
|
longitude = data["uplink_message"]["decoded_payload"]["Longitude"] |
|
altitude = data["uplink_message"]["decoded_payload"]["Altitude"] |
|
if data["uplink_message"]["decoded_payload"]["HDOP"]: |
|
hdop = data["uplink_message"]["decoded_payload"]["HDOP"] |
|
else: |
|
hdop = 0 |
|
rssi = data["uplink_message"]["rx_metadata"][0]["rssi"] |
|
airtime = data["uplink_message"]["consumed_airtime"][:-1] |
|
timestamp = int(datetime.timestamp(datetime.now())) |
|
|
|
# max is 4 volts, 3 volts is considered empty |
|
batpercent = round((bat_v - 3) * 100) |
|
|
|
alarm_status = False |
|
if data["uplink_message"]["decoded_payload"]["ALARM_status"] == "TRUE": |
|
alarm_status = True |
|
logging.info("Red button pushed!") |
|
|
|
logging.info("Motion detection: %s", motion_detection) |
|
logging.info("LED status for position: %s", led_status) |
|
logging.info("Firmware version: %s", fw_version) |
|
|
|
got_fix = False |
|
if latitude == 0: |
|
logging.info("no GPS data (Latitude) present") |
|
# set GPS data to 0 for InfluxDB |
|
latitude = 0.0 |
|
longitude = 0.0 |
|
else: |
|
logging.info( |
|
"GPS data (Latitude) present: lat %s, lon %s", |
|
latitude, |
|
longitude, |
|
) |
|
got_fix = True |
|
# transform received data into OwnTracks format |
|
ot_data = json.dumps( |
|
{ |
|
"_type": "location", |
|
"lat": latitude, |
|
"lon": longitude, |
|
"batt": batpercent, |
|
"t": "p", |
|
"tid": OT_TID, |
|
"tst": timestamp, |
|
"conn": "m", |
|
} |
|
) |
|
|
|
# publish to owntracks |
|
logging.info("publishing data to owntracks via mqtt to topic %s", OT_TOPIC) |
|
client_ot.publish(OT_TOPIC, payload=ot_data, retain=True, qos=1) |
|
|
|
# send to traccar |
|
logging.info("publishing data to traccar") |
|
traccar_url = f"{DST_TRACCAR_URL}/?id={OT_TID}&lat={latitude}&lon={longitude}×tamp={timestamp}&hdop={hdop}&altitude={altitude}&speed=0" |
|
requests.get(traccar_url) |
|
|
|
# write to influxdb |
|
logging.info("writing data to influxdb") |
|
influxdb_points = [ |
|
{ |
|
"measurement": "dragino", |
|
"tags": { |
|
"device": "lgt92", |
|
}, |
|
"fields": { |
|
"bat": float(bat_v), |
|
"pitch": float(pitch), |
|
"roll": float(roll), |
|
"lat": float(latitude), |
|
"lon": float(longitude), |
|
"alarm": int(alarm_status), |
|
"airtime": float(airtime), |
|
"rssi": rssi, |
|
"fix": got_fix, |
|
}, |
|
} |
|
] |
|
influxdb.write_points(influxdb_points) |
|
|
|
logging.info("data processing done") |
|
|
|
|
|
def shutdown(): |
|
logging.info("disconnecting from mqtt") |
|
client_ot.disconnect() |
|
client_ot.loop_stop() |
|
client_ttn.disconnect() |
|
client_ttn.loop_stop() |
|
|
|
|
|
def handleSIGTERM(signalNumber, frame): |
|
logging.info("got SIGTERM") |
|
shutdown() |
|
return |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
signal.signal(signal.SIGTERM, handleSIGTERM) |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(message)s", |
|
datefmt="%Y-%m-%d %H:%M:%S %Z", |
|
) |
|
|
|
logging.info("Starting ioteer lgt92. " + VERSION) |
|
|
|
# Prepare InfluxDB |
|
influxdb = InfluxDBClient( |
|
host=DST_INFLUX_HOST, |
|
port=443, |
|
database=DST_INFLUX_DB, |
|
username=DST_INFLUX_USER, |
|
password=DST_INFLUX_PASS, |
|
ssl=True, |
|
verify_ssl=True, |
|
) |
|
|
|
# Prepare MQTT for The Things Network |
|
client_ttn = mqtt.Client() |
|
client_ttn.enable_logger() |
|
client_ttn.on_connect = on_connect_ttn |
|
client_ttn.on_message = on_message_ttn |
|
client_ttn.on_log = on_log |
|
client_ttn.username_pw_set(SRC_MQTT_USER, SRC_MQTT_PASS) |
|
client_ttn.tls_set() |
|
client_ttn.connect(SRC_MQTT_HOST, 8883, 60) |
|
|
|
# Prepare MQTT for OwnTracks |
|
ot_lwt = json.dumps( |
|
{ |
|
"_type": "lwt", |
|
"tst": int(datetime.timestamp(datetime.now())), |
|
} |
|
) |
|
client_ot = mqtt.Client() |
|
client_ot.enable_logger() |
|
client_ot.on_connect = on_connect_ot |
|
client_ot.on_publish = on_publish_ot |
|
client_ot.on_log = on_log |
|
client_ot.username_pw_set(DST_MQTT_USER, DST_MQTT_PASS) |
|
client_ot.tls_set() |
|
client_ot.will_set(OT_TOPIC, payload=ot_lwt, qos=1, retain=True) |
|
client_ot.connect(DST_MQTT_HOST, 8883, 60) |
|
|
|
try: |
|
# Connect to MQTT and react to messages |
|
client_ot.loop_start() |
|
client_ttn.loop_forever() |
|
except KeyboardInterrupt: |
|
shutdown() |
|
logging.info("tschuess")
|
|
|