You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

184 lines
5.9KB

  1. import paho.mqtt.client as mqtt
  2. import json
  3. import os
  4. import logging
  5. import signal
  6. import requests
  7. from datetime import datetime
  8. from influxdb import InfluxDBClient
  9. from dotenv import find_dotenv, load_dotenv
  10. load_dotenv(find_dotenv())
  11. SRC_MQTT_HOST = os.getenv("SRC_MQTT_HOST")
  12. SRC_MQTT_USER = os.getenv("SRC_MQTT_USER")
  13. SRC_MQTT_PASS = os.getenv("SRC_MQTT_PASS")
  14. DST_MQTT_HOST = os.getenv("DST_MQTT_HOST")
  15. DST_MQTT_USER = os.getenv("DST_MQTT_USER")
  16. DST_MQTT_PASS = os.getenv("DST_MQTT_PASS")
  17. DST_INFLUX_HOST = os.getenv("DST_INFLUX_HOST")
  18. DST_INFLUX_USER = os.getenv("DST_INFLUX_USER")
  19. DST_INFLUX_PASS = os.getenv("DST_INFLUX_PASS")
  20. DST_INFLUX_DB = os.getenv("DST_INFLUX_DB")
  21. HC_PING_URL = os.getenv("HC_PING_URL")
  22. VERSION = "V1.1"
  23. OT_TOPIC="owntracks/tobru/dragino"
  24. OT_TID="dragino"
  25. def on_connect_ttn(client, userdata, flags, rc):
  26. logging.info("connected to ttn %s - %s", SRC_MQTT_HOST, str(rc))
  27. client.subscribe("+/devices/+/up")
  28. def on_connect_ot(client, userdata, flags, rc):
  29. logging.info("connected to ot %s - %s", DST_MQTT_HOST, str(rc))
  30. def on_publish_ot(client, userdata, rc):
  31. logging.info("published data to ot")
  32. def on_log(client, userdata, level, buf):
  33. logging_level = mqtt.LOGGING_LEVEL[level]
  34. logging.log(logging_level, buf)
  35. #logging.info("got a log message level %s: %s", level, str(buf))
  36. if "PINGRESP" in str(buf):
  37. # report to https://healthchecks.io to tell that the connection is alive
  38. requests.get(HC_PING_URL)
  39. # The callback for when a PUBLISH message is received from the server.
  40. def on_message_ttn(client, userdata, msg):
  41. data = json.loads(msg.payload)
  42. logging.info("message from ttn received for %s - #%s", data["dev_id"], data["counter"])
  43. # retrieve info about gateway
  44. gtw_id = data["metadata"]["gateways"][0]["gtw_id"]
  45. try:
  46. gtw_info = requests.get("https://www.thethingsnetwork.org/gateway-data/gateway/"+gtw_id).json()
  47. logging.info("received via gw %s, %s, owned by %s",
  48. data["metadata"]["gateways"][0]["gtw_id"],
  49. gtw_info[gtw_id]["description"],
  50. gtw_info[gtw_id]["owner"],
  51. )
  52. except:
  53. logging.info("received via gw %s", gtw_id)
  54. # max is 4 volts, 3 volts is considered empty
  55. batpercent = round((data["payload_fields"]["batV"] - 3) * 100)
  56. if data["payload_fields"]["alarm"]:
  57. print("ALARM button pressed")
  58. got_fix = False
  59. if "latitude" in data["payload_fields"]:
  60. got_fix = True
  61. # transform received data into OwnTracks format
  62. ot_data = json.dumps({
  63. "_type": "location",
  64. "lat": data["payload_fields"]["latitude"],
  65. "lon": data["payload_fields"]["longitude"],
  66. "batt": batpercent,
  67. "t": "p",
  68. "tid": OT_TID,
  69. "tst": int(datetime.timestamp(datetime.now())),
  70. "conn": "m",
  71. })
  72. # publish to owntracks
  73. logging.info("publishing data to owntracks via mqtt %s", OT_TOPIC)
  74. client_ot.publish(OT_TOPIC, payload=ot_data, retain=True, qos=1)
  75. else:
  76. logging.info("no GPS data / latitude present")
  77. # set GPS data to 0 for InfluxDB
  78. data["payload_fields"]["latitude"] = 0.0
  79. data["payload_fields"]["longitude"] = 0.0
  80. # write to influxdb
  81. logging.info("writing data to influxdb")
  82. influxdb.write_points(
  83. [{
  84. "measurement": "dragino",
  85. "tags": {
  86. "device": "lgt92",
  87. },
  88. "fields": {
  89. "bat": data["payload_fields"]["batV"],
  90. "pitch": data["payload_fields"]["pitch"],
  91. "roll": data["payload_fields"]["roll"],
  92. "lat": data["payload_fields"]["latitude"],
  93. "lon": data["payload_fields"]["longitude"],
  94. "alarm": int(data["payload_fields"]["alarm"]),
  95. "counter": data["counter"],
  96. "airtime": data["metadata"]["airtime"],
  97. "rssi": data["metadata"]["gateways"][0]["rssi"],
  98. "fix": got_fix,
  99. }
  100. }]
  101. )
  102. logging.info("data processing done")
  103. def shutdown():
  104. logging.info("disconnecting from mqtt")
  105. client_ot.disconnect()
  106. client_ot.loop_stop()
  107. client_ttn.disconnect()
  108. client_ttn.loop_stop()
  109. def handleSIGTERM(signalNumber, frame):
  110. logging.info("got SIGTERM")
  111. shutdown()
  112. return
  113. if __name__ == '__main__':
  114. signal.signal(signal.SIGTERM, handleSIGTERM)
  115. logging.basicConfig(
  116. level=logging.INFO,
  117. format='%(asctime)s - %(message)s',
  118. datefmt='%Y-%m-%d %H:%M:%S %Z'
  119. )
  120. logging.info("Starting ioteer lgt92. "+VERSION)
  121. # Prepare InfluxDB
  122. influxdb = InfluxDBClient(
  123. host=DST_INFLUX_HOST,
  124. port=443,
  125. database=DST_INFLUX_DB,
  126. username=DST_INFLUX_USER,
  127. password=DST_INFLUX_PASS,
  128. ssl=True,
  129. verify_ssl=True,
  130. )
  131. # Prepare MQTT for The Things Network
  132. client_ttn = mqtt.Client()
  133. client_ttn.enable_logger()
  134. client_ttn.on_connect = on_connect_ttn
  135. client_ttn.on_message = on_message_ttn
  136. client_ttn.on_log = on_log
  137. client_ttn.username_pw_set(SRC_MQTT_USER,SRC_MQTT_PASS)
  138. client_ttn.tls_set()
  139. client_ttn.connect(SRC_MQTT_HOST, 8883, 60)
  140. # Prepare MQTT for OwnTracks
  141. ot_lwt = json.dumps({
  142. "_type": "lwt",
  143. "tst": int(datetime.timestamp(datetime.now())),
  144. })
  145. client_ot = mqtt.Client()
  146. client_ot.enable_logger()
  147. client_ot.on_connect = on_connect_ot
  148. client_ot.on_publish = on_publish_ot
  149. client_ot.on_log = on_log
  150. client_ot.username_pw_set(DST_MQTT_USER,DST_MQTT_PASS)
  151. client_ot.tls_set()
  152. client_ot.will_set(OT_TOPIC, payload=ot_lwt, qos=1, retain=True)
  153. client_ot.connect(DST_MQTT_HOST, 8883, 60)
  154. try:
  155. # Connect to MQTT and react to messages
  156. client_ot.loop_start()
  157. client_ttn.loop_forever()
  158. except KeyboardInterrupt:
  159. shutdown()
  160. logging.info("tschuess")