ioteer/rhf1s001.py

114 lines
3.2 KiB
Python
Raw Normal View History

2020-05-02 12:43:30 +00:00
import paho.mqtt.client as mqtt
import json
import os
import logging
import signal
from datetime import datetime
from influxdb import InfluxDBClient
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv(filename=".env2"))
SRC_MQTT_HOST = os.getenv("SRC_MQTT_HOST")
SRC_MQTT_USER = os.getenv("SRC_MQTT_USER")
SRC_MQTT_PASS = os.getenv("SRC_MQTT_PASS")
DST_INFLUX_HOST = os.getenv("DST_INFLUX_HOST")
DST_INFLUX_USER = os.getenv("DST_INFLUX_USER")
DST_INFLUX_PASS = os.getenv("DST_INFLUX_PASS")
DST_INFLUX_DB = os.getenv("DST_INFLUX_DB")
2021-10-17 10:59:33 +00:00
2020-05-02 12:43:30 +00:00
def on_connect_ttn(client, userdata, flags, rc):
logging.info("connected to ttn %s - %s", SRC_MQTT_HOST, str(rc))
2021-10-16 12:55:00 +00:00
client.subscribe("v3/+/devices/+/up")
2020-05-02 12:43:30 +00:00
2021-10-17 10:59:33 +00:00
2020-05-02 12:43:30 +00:00
def on_log(client, userdata, level, buf):
logging_level = mqtt.LOGGING_LEVEL[level]
logging.log(logging_level, buf)
2020-05-02 12:45:58 +00:00
logging.info("got a log message level %s: %s", level, str(buf))
2020-05-02 12:43:30 +00:00
2021-10-17 10:59:33 +00:00
2020-05-02 12:43:30 +00:00
# The callback for when a PUBLISH message is received from the server.
def on_message_ttn(client, userdata, msg):
data = json.loads(msg.payload)
2021-10-17 10:59:33 +00:00
logging.info(
"new data received via gw %s",
data["uplink_message"]["rx_metadata"][0]["gateway_ids"]["gateway_id"],
)
2020-05-02 12:43:30 +00:00
# write to influxdb
2021-10-17 10:59:33 +00:00
batt = data["uplink_message"]["decoded_payload"]["battery"]
hum = data["uplink_message"]["decoded_payload"]["hum"]
temp = data["uplink_message"]["decoded_payload"]["temp"]
logging.info(f"writing data to influxdb: batt={batt}, hum={hum}, temp={temp}")
point = [
2020-05-02 12:43:30 +00:00
{
"measurement": "risinghf",
"tags": {
"device": "rhf1s001",
},
"fields": {
2021-10-17 10:59:33 +00:00
"battery": batt,
"hum": hum,
"temp": temp,
2021-10-16 13:12:29 +00:00
"counter": data["uplink_message"]["f_cnt"],
"rssi": data["uplink_message"]["rx_metadata"][0]["rssi"],
2021-10-17 10:59:33 +00:00
},
2020-05-02 12:43:30 +00:00
}
2021-10-17 10:59:33 +00:00
]
influxdb.write_points(point)
2020-05-02 12:43:30 +00:00
logging.info("data processing done")
2021-10-17 10:59:33 +00:00
2020-05-02 12:43:30 +00:00
def shutdown():
logging.info("disconnecting from mqtt")
client_ttn.disconnect()
client_ttn.loop_stop()
2021-10-17 10:59:33 +00:00
2020-05-02 12:43:30 +00:00
def handleSIGTERM(signalNumber, frame):
logging.info("got SIGTERM")
shutdown()
return
2021-10-17 10:59:33 +00:00
if __name__ == "__main__":
2020-05-02 12:43:30 +00:00
signal.signal(signal.SIGTERM, handleSIGTERM)
logging.basicConfig(
level=logging.INFO,
2021-10-17 10:59:33 +00:00
format="%(asctime)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %Z",
2020-05-02 12:43:30 +00:00
)
# Prepare InfluxDB
influxdb = InfluxDBClient(
2021-10-17 10:59:33 +00:00
host=DST_INFLUX_HOST,
port=443,
database=DST_INFLUX_DB,
username=DST_INFLUX_USER,
password=DST_INFLUX_PASS,
ssl=True,
verify_ssl=True,
2020-05-02 12:43:30 +00:00
)
# Prepare MQTT
client_ttn = mqtt.Client()
client_ttn.enable_logger()
client_ttn.on_connect = on_connect_ttn
client_ttn.on_message = on_message_ttn
client_ttn.on_log = on_log
2021-10-17 10:59:33 +00:00
client_ttn.username_pw_set(SRC_MQTT_USER, SRC_MQTT_PASS)
2020-05-02 12:43:30 +00:00
client_ttn.tls_set()
client_ttn.connect(SRC_MQTT_HOST, 8883, 60)
try:
# Connect to MQTT and react to messages
client_ttn.loop_forever()
except KeyboardInterrupt:
client_ttn.disconnect()
client_ttn.loop_stop()
2021-10-16 12:55:00 +00:00
logging.info("tschuess")