Tobias Brunner
7123e5741a
All checks were successful
continuous-integration/drone/push Build is passing
79 lines
3 KiB
YAML
79 lines
3 KiB
YAML
apiVersion: v1
|
|
kind: ConfigMap
|
|
metadata:
|
|
name: transformers
|
|
data:
|
|
transformers.py: |
|
|
import ast
|
|
import logging
|
|
|
|
|
|
def laq4_values(topic, data, srv=None):
|
|
payload = ast.literal_eval(data["payload"])
|
|
d = dict(
|
|
battery=payload["uplink_message"]["decoded_payload"]["Bat_V"],
|
|
temperature=payload["uplink_message"]["decoded_payload"]["TempC_SHT"],
|
|
humidity=payload["uplink_message"]["decoded_payload"]["Hum_SHT"],
|
|
tvoc_ppb=payload["uplink_message"]["decoded_payload"]["TVOC_ppb"],
|
|
co2_ppm=payload["uplink_message"]["decoded_payload"]["CO2_ppm"],
|
|
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
|
|
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
|
|
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
|
|
"gateway_id"
|
|
],
|
|
)
|
|
logging.debug(d)
|
|
return d
|
|
|
|
|
|
def rhf1s001_values(topic, data, srv=None):
|
|
payload = ast.literal_eval(data["payload"])
|
|
d = dict(
|
|
battery=payload["uplink_message"]["decoded_payload"]["battery"],
|
|
temperature=payload["uplink_message"]["decoded_payload"]["temp"],
|
|
humidity=payload["uplink_message"]["decoded_payload"]["hum"],
|
|
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
|
|
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
|
|
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
|
|
"gateway_id"
|
|
],
|
|
)
|
|
logging.debug(d)
|
|
return d
|
|
|
|
|
|
def lds02_values(topic, data, srv=None):
|
|
payload = ast.literal_eval(data["payload"])
|
|
d = dict(
|
|
Topic=topic,
|
|
Battery=payload["uplink_message"]["decoded_payload"]["BAT_V"],
|
|
Alarm=payload["uplink_message"]["decoded_payload"]["ALARM"],
|
|
DoorOpenStatus=payload["uplink_message"]["decoded_payload"]["DOOR_OPEN_STATUS"],
|
|
DoorOpenTimes=payload["uplink_message"]["decoded_payload"]["DOOR_OPEN_TIMES"],
|
|
LastDoorOpenDuration=payload["uplink_message"]["decoded_payload"][
|
|
"LAST_DOOR_OPEN_DURATION"
|
|
],
|
|
Mod=payload["uplink_message"]["decoded_payload"]["MOD"],
|
|
rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"],
|
|
consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""),
|
|
gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][
|
|
"gateway_id"
|
|
],
|
|
)
|
|
logging.debug(d)
|
|
return d
|
|
|
|
|
|
def RuuviGollectorDataMap(topic):
|
|
if type(topic) == str:
|
|
try:
|
|
# ruuvitag-gollector/location/place/mac
|
|
parts = topic.split('/')
|
|
location = parts[1]
|
|
place = parts[2]
|
|
except:
|
|
deviceid = 'unknown'
|
|
username = 'unknown'
|
|
return dict(location=location, place=place)
|
|
return None
|