script per device
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
This commit is contained in:
parent
ac19187352
commit
41c9b481f8
|
@ -1 +1,2 @@
|
||||||
.env
|
.env
|
||||||
|
.env2
|
||||||
|
|
|
@ -2,6 +2,7 @@ import paho.mqtt.client as mqtt
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
import signal
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from influxdb import InfluxDBClient
|
from influxdb import InfluxDBClient
|
||||||
from dotenv import find_dotenv, load_dotenv
|
from dotenv import find_dotenv, load_dotenv
|
||||||
|
@ -34,7 +35,7 @@ def on_publish_ot(client, userdata, rc):
|
||||||
def on_log(client, userdata, level, buf):
|
def on_log(client, userdata, level, buf):
|
||||||
logging_level = mqtt.LOGGING_LEVEL[level]
|
logging_level = mqtt.LOGGING_LEVEL[level]
|
||||||
logging.log(logging_level, buf)
|
logging.log(logging_level, buf)
|
||||||
logging.info("got a log message for %s level %s: %s", str(userdata), level, str(buf))
|
logging.info("got a log message level %s: %s", str(userdata), level, str(buf))
|
||||||
|
|
||||||
# The callback for when a PUBLISH message is received from the server.
|
# The callback for when a PUBLISH message is received from the server.
|
||||||
def on_message_ttn(client, userdata, msg):
|
def on_message_ttn(client, userdata, msg):
|
||||||
|
@ -81,6 +82,7 @@ def on_message_ttn(client, userdata, msg):
|
||||||
"alarm": int(data["payload_fields"]["alarm"]),
|
"alarm": int(data["payload_fields"]["alarm"]),
|
||||||
"counter": data["counter"],
|
"counter": data["counter"],
|
||||||
"airtime": data["metadata"]["airtime"],
|
"airtime": data["metadata"]["airtime"],
|
||||||
|
"rssi": data["metadata"]["gateways"][0]["rssi"],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
@ -90,49 +92,62 @@ def on_message_ttn(client, userdata, msg):
|
||||||
|
|
||||||
logging.info("data processing done")
|
logging.info("data processing done")
|
||||||
|
|
||||||
logging.basicConfig(
|
def shutdown():
|
||||||
level=logging.INFO,
|
logging.info("disconnecting from mqtt")
|
||||||
format='%(asctime)s - %(message)s',
|
|
||||||
datefmt='%Y-%m-%d %H:%M:%S %Z'
|
|
||||||
)
|
|
||||||
|
|
||||||
# Prepare InfluxDB
|
|
||||||
influxdb = InfluxDBClient(
|
|
||||||
host=DST_INFLUX_HOST,
|
|
||||||
port=443,
|
|
||||||
database=DST_INFLUX_DB,
|
|
||||||
username=DST_INFLUX_USER,
|
|
||||||
password=DST_INFLUX_PASS,
|
|
||||||
ssl=True,
|
|
||||||
verify_ssl=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Prepare MQTT
|
|
||||||
client_ttn = mqtt.Client()
|
|
||||||
client_ttn.enable_logger()
|
|
||||||
client_ttn.on_connect = on_connect_ttn
|
|
||||||
client_ttn.on_message = on_message_ttn
|
|
||||||
client_ttn.on_log = on_log
|
|
||||||
client_ttn.username_pw_set(SRC_MQTT_USER,SRC_MQTT_PASS)
|
|
||||||
client_ttn.tls_set()
|
|
||||||
client_ttn.connect(SRC_MQTT_HOST, 8883, 60)
|
|
||||||
|
|
||||||
client_ot = mqtt.Client()
|
|
||||||
client_ot.enable_logger()
|
|
||||||
client_ot.on_connect = on_connect_ot
|
|
||||||
client_ot.on_publish = on_publish_ot
|
|
||||||
client_ot.on_log = on_log
|
|
||||||
client_ot.username_pw_set(DST_MQTT_USER,DST_MQTT_PASS)
|
|
||||||
client_ot.tls_set()
|
|
||||||
client_ot.connect(DST_MQTT_HOST, 8883, 60)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Connect to MQTT and react to messages
|
|
||||||
client_ot.loop_start()
|
|
||||||
client_ttn.loop_forever()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
client_ot.disconnect()
|
client_ot.disconnect()
|
||||||
client_ot.loop_stop()
|
client_ot.loop_stop()
|
||||||
client_ttn.disconnect()
|
client_ttn.disconnect()
|
||||||
client_ttn.loop_stop()
|
client_ttn.loop_stop()
|
||||||
logging.info("tschuess")
|
|
||||||
|
def handleSIGTERM(signalNumber, frame):
|
||||||
|
logging.info("got SIGTERM")
|
||||||
|
shutdown()
|
||||||
|
return
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
signal.signal(signal.SIGTERM, handleSIGTERM)
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S %Z'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Prepare InfluxDB
|
||||||
|
influxdb = InfluxDBClient(
|
||||||
|
host=DST_INFLUX_HOST,
|
||||||
|
port=443,
|
||||||
|
database=DST_INFLUX_DB,
|
||||||
|
username=DST_INFLUX_USER,
|
||||||
|
password=DST_INFLUX_PASS,
|
||||||
|
ssl=True,
|
||||||
|
verify_ssl=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Prepare MQTT
|
||||||
|
client_ttn = mqtt.Client()
|
||||||
|
client_ttn.enable_logger()
|
||||||
|
client_ttn.on_connect = on_connect_ttn
|
||||||
|
client_ttn.on_message = on_message_ttn
|
||||||
|
client_ttn.on_log = on_log
|
||||||
|
client_ttn.username_pw_set(SRC_MQTT_USER,SRC_MQTT_PASS)
|
||||||
|
client_ttn.tls_set()
|
||||||
|
client_ttn.connect(SRC_MQTT_HOST, 8883, 60)
|
||||||
|
|
||||||
|
client_ot = mqtt.Client()
|
||||||
|
client_ot.enable_logger()
|
||||||
|
client_ot.on_connect = on_connect_ot
|
||||||
|
client_ot.on_publish = on_publish_ot
|
||||||
|
client_ot.on_log = on_log
|
||||||
|
client_ot.username_pw_set(DST_MQTT_USER,DST_MQTT_PASS)
|
||||||
|
client_ot.tls_set()
|
||||||
|
client_ot.connect(DST_MQTT_HOST, 8883, 60)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Connect to MQTT and react to messages
|
||||||
|
client_ot.loop_start()
|
||||||
|
client_ttn.loop_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
shutdown()
|
||||||
|
logging.info("tschuess")
|
|
@ -0,0 +1,103 @@
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
import signal
|
||||||
|
from datetime import datetime
|
||||||
|
from influxdb import InfluxDBClient
|
||||||
|
from dotenv import find_dotenv, load_dotenv
|
||||||
|
|
||||||
|
load_dotenv(find_dotenv(filename=".env2"))
|
||||||
|
SRC_MQTT_HOST = os.getenv("SRC_MQTT_HOST")
|
||||||
|
SRC_MQTT_USER = os.getenv("SRC_MQTT_USER")
|
||||||
|
SRC_MQTT_PASS = os.getenv("SRC_MQTT_PASS")
|
||||||
|
DST_INFLUX_HOST = os.getenv("DST_INFLUX_HOST")
|
||||||
|
DST_INFLUX_USER = os.getenv("DST_INFLUX_USER")
|
||||||
|
DST_INFLUX_PASS = os.getenv("DST_INFLUX_PASS")
|
||||||
|
DST_INFLUX_DB = os.getenv("DST_INFLUX_DB")
|
||||||
|
|
||||||
|
def on_connect_ttn(client, userdata, flags, rc):
|
||||||
|
logging.info("connected to ttn %s - %s", SRC_MQTT_HOST, str(rc))
|
||||||
|
client.subscribe("+/devices/+/up")
|
||||||
|
|
||||||
|
def on_log(client, userdata, level, buf):
|
||||||
|
logging_level = mqtt.LOGGING_LEVEL[level]
|
||||||
|
logging.log(logging_level, buf)
|
||||||
|
logging.info("got a log message level %s: %s", str(userdata), level, str(buf))
|
||||||
|
|
||||||
|
# The callback for when a PUBLISH message is received from the server.
|
||||||
|
def on_message_ttn(client, userdata, msg):
|
||||||
|
data = json.loads(msg.payload)
|
||||||
|
logging.info("message from ttn received for %s - #%s", data["dev_id"], data["counter"])
|
||||||
|
logging.info("received via gw %s", data["metadata"]["gateways"][0]["gtw_id"])
|
||||||
|
|
||||||
|
# write to influxdb
|
||||||
|
logging.info("writing data to influxdb")
|
||||||
|
influxdb.write_points(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"measurement": "risinghf",
|
||||||
|
"tags": {
|
||||||
|
"device": "rhf1s001",
|
||||||
|
},
|
||||||
|
"fields": {
|
||||||
|
"battery": data["payload_fields"]["battery"],
|
||||||
|
"hum": data["payload_fields"]["hum"],
|
||||||
|
"temp": data["payload_fields"]["temp"],
|
||||||
|
"counter": data["counter"],
|
||||||
|
"rssi": data["metadata"]["gateways"][0]["rssi"],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
logging.info("data processing done")
|
||||||
|
|
||||||
|
def shutdown():
|
||||||
|
logging.info("disconnecting from mqtt")
|
||||||
|
client_ttn.disconnect()
|
||||||
|
client_ttn.loop_stop()
|
||||||
|
|
||||||
|
def handleSIGTERM(signalNumber, frame):
|
||||||
|
logging.info("got SIGTERM")
|
||||||
|
shutdown()
|
||||||
|
return
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
signal.signal(signal.SIGTERM, handleSIGTERM)
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S %Z'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Prepare InfluxDB
|
||||||
|
influxdb = InfluxDBClient(
|
||||||
|
host=DST_INFLUX_HOST,
|
||||||
|
port=443,
|
||||||
|
database=DST_INFLUX_DB,
|
||||||
|
username=DST_INFLUX_USER,
|
||||||
|
password=DST_INFLUX_PASS,
|
||||||
|
ssl=True,
|
||||||
|
verify_ssl=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Prepare MQTT
|
||||||
|
client_ttn = mqtt.Client()
|
||||||
|
client_ttn.enable_logger()
|
||||||
|
client_ttn.on_connect = on_connect_ttn
|
||||||
|
client_ttn.on_message = on_message_ttn
|
||||||
|
client_ttn.on_log = on_log
|
||||||
|
client_ttn.username_pw_set(SRC_MQTT_USER,SRC_MQTT_PASS)
|
||||||
|
client_ttn.tls_set()
|
||||||
|
client_ttn.connect(SRC_MQTT_HOST, 8883, 60)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Connect to MQTT and react to messages
|
||||||
|
client_ttn.loop_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
client_ttn.disconnect()
|
||||||
|
client_ttn.loop_stop()
|
||||||
|
logging.info("tschuess")
|
Loading…
Reference in New Issue