This repository has been archived on 2023-04-02. You can view files and clone it, but cannot push or open issues or pull requests.
gitops-tbrnt/mqttwarn/cm-transformers.yaml
Tobias Brunner d45d5ba2e3
All checks were successful
continuous-integration/drone/push Build is passing
add rssi
2023-01-20 21:45:07 +01:00

115 lines
4.6 KiB
YAML

apiVersion: v1
kind: ConfigMap
metadata:
name: transformers
data:
transformers.py: |
import ast
import logging
def laq4_values(topic, data, srv=None):
payload = ast.literal_eval(data["payload"])
d = dict(
battery=payload["uplink_message"]["decoded_payload"]["Bat_V"],
temperature=payload["uplink_message"]["decoded_payload"]["TempC_SHT"],
humidity=payload["uplink_message"]["decoded_payload"]["Hum_SHT"],
tvoc_ppb=payload["uplink_message"]["decoded_payload"]["TVOC_ppb"],
co2_ppm=payload["uplink_message"]["decoded_payload"]["CO2_ppm"],
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
"gateway_id"
],
)
logging.debug(d)
return d
def lht52_values(topic, data, srv=None):
payload = ast.literal_eval(data["payload"])
d = dict(
device_id=payload["end_device_ids"]["device_id"],
temperature=payload["uplink_message"]["decoded_payload"]["TempC_SHT"],
humidity=payload["uplink_message"]["decoded_payload"]["Hum_SHT"],
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
"gateway_id"
],
)
logging.debug(d)
return d
def rhf1s001_values(topic, data, srv=None):
payload = ast.literal_eval(data["payload"])
d = dict(
battery=payload["uplink_message"]["decoded_payload"]["battery"],
temperature=payload["uplink_message"]["decoded_payload"]["temp"],
humidity=payload["uplink_message"]["decoded_payload"]["hum"],
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
"gateway_id"
],
)
logging.debug(d)
return d
def lds02_values(topic, data, srv=None):
payload = ast.literal_eval(data["payload"])
d = dict(
Topic=topic,
Battery=payload["uplink_message"]["decoded_payload"]["BAT_V"],
Alarm=payload["uplink_message"]["decoded_payload"]["ALARM"],
DoorOpenStatus=payload["uplink_message"]["decoded_payload"]["DOOR_OPEN_STATUS"],
DoorOpenTimes=payload["uplink_message"]["decoded_payload"]["DOOR_OPEN_TIMES"],
LastDoorOpenDuration=payload["uplink_message"]["decoded_payload"][
"LAST_DOOR_OPEN_DURATION"
],
Mod=payload["uplink_message"]["decoded_payload"]["MOD"],
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
"gateway_id"
],
)
logging.debug(d)
return d
def pycomruuvi_values(sensorname, payload, topic):
d = dict(
topic=topic,
battery=payload["uplink_message"]["decoded_payload"][sensorname]["battery"] / 1000,
humidity=payload["uplink_message"]["decoded_payload"][sensorname]["humidity"],
rssi=payload["uplink_message"]["decoded_payload"][sensorname]["rssi"],
temperature=payload["uplink_message"]["decoded_payload"][sensorname]["temperature"],
)
logging.debug(d)
return d
def pycomruuvi_values_camperentry(topic, data, srv=None):
payload = ast.literal_eval(data["payload"])
return pycomruuvi_values(sensorname="camperentry", payload=payload, topic=topic)
def pycomruuvi_values_campersleepingzone(topic, data, srv=None):
payload = ast.literal_eval(data["payload"])
return pycomruuvi_values(sensorname="campersleepingzone", payload=payload, topic=topic)
def RuuviGollectorDataMap(topic):
if type(topic) == str:
try:
# ruuvitag-gollector/location/place/mac
parts = topic.split('/')
location = parts[1]
place = parts[2]
except:
deviceid = 'unknown'
username = 'unknown'
return dict(location=location, place=place)
return None