This repository has been archived on 2023-04-02. You can view files and clone it, but cannot push or open issues or pull requests.
gitops-tbrnt/mqttwarn/cm-transformers.yaml
Tobias Brunner c681fccb63
All checks were successful
continuous-integration/drone/push Build is passing
install mqttwarn
2021-11-03 21:55:27 +01:00

65 lines
2.6 KiB
YAML

apiVersion: v1
kind: ConfigMap
metadata:
name: transformers
data:
transformers.py: |
import ast
import logging
def laq4_values(topic, data, srv=None):
payload = ast.literal_eval(data["payload"])
d = dict(
battery=payload["uplink_message"]["decoded_payload"]["Bat_V"],
temperature=payload["uplink_message"]["decoded_payload"]["TempC_SHT"],
humidity=payload["uplink_message"]["decoded_payload"]["Hum_SHT"],
tvoc_ppb=payload["uplink_message"]["decoded_payload"]["TVOC_ppb"],
co2_ppm=payload["uplink_message"]["decoded_payload"]["CO2_ppm"],
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
"gateway_id"
],
)
logging.debug(d)
return d
def rhf1s001_values(topic, data, srv=None):
payload = ast.literal_eval(data["payload"])
d = dict(
battery=payload["uplink_message"]["decoded_payload"]["battery"],
temperature=payload["uplink_message"]["decoded_payload"]["temp"],
humidity=payload["uplink_message"]["decoded_payload"]["hum"],
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
"gateway_id"
],
)
logging.debug(d)
return d
def lds02_values(topic, data, srv=None):
payload = ast.literal_eval(data["payload"])
d = dict(
Topic=topic,
Battery=payload["uplink_message"]["decoded_payload"]["BAT_V"],
Alarm=payload["uplink_message"]["decoded_payload"]["ALARM"],
DoorOpenStatus=payload["uplink_message"]["decoded_payload"]["DOOR_OPEN_STATUS"],
DoorOpenTimes=payload["uplink_message"]["decoded_payload"]["DOOR_OPEN_TIMES"],
LastDoorOpenDuration=payload["uplink_message"]["decoded_payload"][
"LAST_DOOR_OPEN_DURATION"
],
Mod=payload["uplink_message"]["decoded_payload"]["MOD"],
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
"gateway_id"
],
)
logging.debug(d)
return d