#!/usr/bin/python -u # coding=utf-8 # "DATASHEET": http://cl.ly/ekot # https://gist.github.com/kadamski/92653913a53baf9dd1a8 from __future__ import print_function import serial, struct, sys, time, json, subprocess, signal, logging DEBUG = 0 CMD_MODE = 2 CMD_QUERY_DATA = 4 CMD_DEVICE_ID = 5 CMD_SLEEP = 6 CMD_FIRMWARE = 7 CMD_WORKING_PERIOD = 8 MODE_ACTIVE = 0 MODE_QUERY = 1 PERIOD_CONTINUOUS = 0 WAKE_UP_DELAY = 30 SAMPLING_ITI = 15 SENSOR_TIMEOUT = 10 ser = serial.Serial() ser.port = "/dev/ttyUSB0" ser.baudrate = 9600 byte, data = 0, "" def dump(d, prefix=''): logging.debug(prefix + ' '.join(x.encode('hex') for x in d)) def construct_command(cmd, data=[]): assert len(data) <= 12 data += [0,]*(12-len(data)) checksum = (sum(data)+cmd-2)%256 ret = "\xaa\xb4" + chr(cmd) ret += ''.join(chr(x) for x in data) ret += "\xff\xff" + chr(checksum) + "\xab" if DEBUG: dump(ret, '> ') return ret def process_data(d): r = struct.unpack('>8) % 256 id_l = id % 256 ser.write(construct_command(CMD_DEVICE_ID, [0]*10+[id_l, id_h])) read_response() def init_sensor(): ser.open() ser.flushInput() cmd_set_sleep(0) cmd_firmware_ver() cmd_set_working_period(PERIOD_CONTINUOUS) cmd_set_mode(MODE_QUERY); signal.signal(signal.SIGALRM, handler) def get_sensor_data(): cmd_set_sleep(0) logging.info("Wait a while for the sensor to wake up ...") time.sleep(WAKE_UP_DELAY) for t in range(SAMPLING_ITI): try: signal.alarm(SENSOR_TIMEOUT) values = cmd_query_data(); signal.alarm(0) if values is not None and len(values) == 2: data = values logging.debug(str(t) + ". PM2.5: " + str(values[0]) + ", PM10: " + str(values[1])) time.sleep(2) except Exception, exc: logging.error(exc) # Put sensor to sleep cmd_set_sleep(1) time.sleep(10) return data def handler(signum, frame): logging.info("Forever is over!") raise Exception("end of time")