apiVersion: v1 kind: ConfigMap metadata: name: transformers data: transformers.py: | import ast import logging def laq4_values(topic, data, srv=None): payload = ast.literal_eval(data["payload"]) d = dict( battery=payload["uplink_message"]["decoded_payload"]["Bat_V"], temperature=payload["uplink_message"]["decoded_payload"]["TempC_SHT"], humidity=payload["uplink_message"]["decoded_payload"]["Hum_SHT"], tvoc_ppb=payload["uplink_message"]["decoded_payload"]["TVOC_ppb"], co2_ppm=payload["uplink_message"]["decoded_payload"]["CO2_ppm"], rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"], consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""), gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][ "gateway_id" ], ) logging.debug(d) return d def lht52_values(topic, data, srv=None): payload = ast.literal_eval(data["payload"]) d = dict( device_id=payload["end_device_ids"]["device_id"], temperature=payload["uplink_message"]["decoded_payload"]["TempC_SHT"], humidity=payload["uplink_message"]["decoded_payload"]["Hum_SHT"], consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""), rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"], gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][ "gateway_id" ], ) logging.debug(d) return d def rhf1s001_values(topic, data, srv=None): payload = ast.literal_eval(data["payload"]) d = dict( battery=payload["uplink_message"]["decoded_payload"]["battery"], temperature=payload["uplink_message"]["decoded_payload"]["temp"], humidity=payload["uplink_message"]["decoded_payload"]["hum"], rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"], consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""), gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][ "gateway_id" ], ) logging.debug(d) return d def lds02_values(topic, data, srv=None): payload = ast.literal_eval(data["payload"]) d = dict( Topic=topic, Battery=payload["uplink_message"]["decoded_payload"]["BAT_V"], Alarm=payload["uplink_message"]["decoded_payload"]["ALARM"], DoorOpenStatus=payload["uplink_message"]["decoded_payload"]["DOOR_OPEN_STATUS"], DoorOpenTimes=payload["uplink_message"]["decoded_payload"]["DOOR_OPEN_TIMES"], LastDoorOpenDuration=payload["uplink_message"]["decoded_payload"][ "LAST_DOOR_OPEN_DURATION" ], Mod=payload["uplink_message"]["decoded_payload"]["MOD"], rssi=payload["uplink_message"]["rx_metadata"][0]["rssi"], consumed_airtime=payload["uplink_message"]["consumed_airtime"].replace("s", ""), gateway_id=payload["uplink_message"]["rx_metadata"][0]["gateway_ids"][ "gateway_id" ], ) logging.debug(d) return d def pycomruuvi_values(sensorname, payload, topic): d = dict( topic=topic, battery=payload["uplink_message"]["decoded_payload"][sensorname]["battery"] / 1000, humidity=payload["uplink_message"]["decoded_payload"][sensorname]["humidity"], rssi=payload["uplink_message"]["decoded_payload"][sensorname]["rssi"], temperature=payload["uplink_message"]["decoded_payload"][sensorname]["temperature"], ) logging.debug(d) return d def pycomruuvi_values_camperentry(topic, data, srv=None): payload = ast.literal_eval(data["payload"]) return pycomruuvi_values(sensorname="camperentry", payload=payload, topic=topic) def pycomruuvi_values_campersleepingzone(topic, data, srv=None): payload = ast.literal_eval(data["payload"]) return pycomruuvi_values(sensorname="campersleepingzone", payload=payload, topic=topic) def RuuviGollectorDataMap(topic): if type(topic) == str: try: # ruuvitag-gollector/location/place/mac parts = topic.split('/') location = parts[1] place = parts[2] except: deviceid = 'unknown' username = 'unknown' return dict(location=location, place=place) return None