Amazon Web Services
AWS (Amazon Web Services) is a cloud service that offers easy data storage using MQTT, HTTP or websockets protocol. This example uses MQTT to send simple test data to the AWS server. For more information, visit the Amazon Website.
Be aware that this example uses an old version of the MQTT library and may be missing features that are currently available.
Connectivity code (demo.py):
import time
from mqttclient import MQTTClient
DISCONNECTED = 0
CONNECTING = 1
CONNECTED = 2
DEVICE_ID = "12345"
HOST = "data.iot.us-west-2.amazonaws.com"
TOPIC_DOWNLOAD = "Download"
TOPIC_UPLOAD = "Upload"
state = DISCONNECTED
connection = None
def _recv_msg_callback(topic, msg):
print("Received: {} from Topic: {}".format(msg, topic))
def _send_msg(msg):
global connection
connection.publish(TOPIC_UPLOAD, msg)
def run():
global state
global connection
while True:
# Wait for connection
while state != CONNECTED:
try:
state = CONNECTING
connection = MQTTClient(DEVICE_ID, server=HOST, port=8883)
connection.connect(ssl=True, certfile='/flash/cert/certificate.crt', keyfile='/flash/cert/privateKey.key', ca_certs='/flash/cert/root-CA.cer')
state = CONNECTED
except:
print('Error connecting to the server')
time.sleep(0.5)
continue
print('Connected!')
# Subscribe for messages
connection.set_callback(_recv_msg_callback)
connection.subscribe(TOPIC_DOWNLOAD)
while state == CONNECTED:
connection.check_msg()
msg = '{"Name":"Pycom", "Data":"Test"}'
print('Sending: ' + msg)
_send_msg(msg)
time.sleep(2.0)
Usage (main.py)
import demo
demo.run()
MQTT client class:
import socket
import struct
import select
from binascii import hexlify
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(self, client_id, server, port=1883, user=None, password=None):
self.client_id = client_id.encode('utf8')
self.sock = None
self.addr = socket.getaddrinfo(server, port)[0][-1]
self.pid = 0
self.cb = None
self.poll = select.poll()
self.__will_message = None
if user:
self.__user = user.encode('utf8')
else:
self.__user = None
self.__password = password
def __encode_varlen_length(self, length):
i = 0
buff = bytearray()
while 1:
buff.append(length % 128)
length = length // 128
if length > 0:
buff[i] = buff[i] | 0x80
i += 1
else:
break
return buff
def __encode_16(self, x):
return struct.pack("!H", x)
def __pascal_string(self, s):
return struct.pack("!H", len(s)) + s
def __recv_varlen_length(self):
m = 1
val = 0
while 1:
b = self.sock.recv(1)[0]
val += (b & 0x7F) * m
m *= 128
if m > 2097152: # 128 * 128 * 128
raise MQTTException()
if (b & 0x80) == 0:
break
return val
def set_callback(self, f):
self.cb = f
def set_will(self, will_topic, will_message, will_qos=0, will_retain=0):
if will_topic:
self.__will_topic = will_topic.encode('utf8')
self.__will_message = will_message
self.__will_qos = will_qos
self.__will_retain = will_retain
def connect(self, clean_session=True, ssl=False, certfile=None, keyfile=None, ca_certs=None):
try:
self.poll.unregister(self.sock)
except:
pass
self.sock = socket.socket()
if ssl:
import ssl
self.sock = ssl.wrap_socket(self.sock, certfile=certfile, keyfile=keyfile, ca_certs=ca_certs, cert_reqs=ssl.CERT_REQUIRED)
self.sock.connect(self.addr)
self.poll.register(self.sock, select.POLLIN)
pkt_len = (12 + len(self.client_id) + # 10 + 2 + len(client_id)
(2 + len(self.__user) if self.__user else 0) +
(2 + len(self.__password) if self.__password else 0))
flags = (0x80 if self.__user else 0x00) | (0x40 if self.__password else 0x00) | (0x02 if clean_session else 0x00)
if self.__will_message:
flags |= (self.__will_retain << 3 | self.__will_qos << 1 | 1) << 2
pkt_len += 4 + len(self.__will_topic) + len(self.__will_message)
pkt = bytearray([0x10]) # connect
pkt.extend(self.__encode_varlen_length(pkt_len)) # len of the remaining
pkt.extend(b'\x00\x04MQTT\x04') # len of "MQTT" (16 bits), protocol name, and protocol version
pkt.append(flags)
pkt.extend(b'\x00\x00') # disable keepalive
pkt.extend(self.__pascal_string(self.client_id))
if self.__will_message:
pkt.extend(self.__pascal_string(self.__will_topic))
pkt.extend(self.__pascal_string(self.__will_message))
if self.__user:
pkt.extend(self.__pascal_string(self.__user))
if self.__password:
pkt.extend(self.__pascal_string(self.__password))
self.sock.send(pkt)
resp = self.sock.recv(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1
def disconnect(self):
self.sock.send(b"\xe0\0")
self.sock.close()
def ping(self):
self.sock.send(b"\xc0\0")
def publish(self, topic, msg, retain=False, qos=0, dup=0):
topic = topic.encode('utf8')
hdr = 0x30 | (dup << 3) | (qos << 1) | retain
pkt_len = (2 + len(topic) +
(2 if qos else 0) +
(len(msg)))
pkt = bytearray()
pkt.append(hdr)
pkt.extend(self.__encode_varlen_length(pkt_len)) # len of the remaining
pkt.extend(self.__pascal_string(topic))
if qos:
self.pid += 1 #todo: I don't think this is the way to deal with the packet id
pkt.extend(self.__encode_16(self.pid))
self.sock.send(pkt)
self.sock.send(msg)
#todo: check next part of the code
if qos == 1:
while 1:
rcv_pid = self.recv_pubconf(0)
if pid == rcv_pid:
return
elif qos == 2:
assert 0
def recv_pubconf(self, t):
headers = [0x40, 0x50, 0x62, 0x70]
header = headers[t]
while 1:
op = self.wait_msg()
if op == header:
sz = self.sock.recv(1)
assert sz == b"\x02"
return
def subscribe(self, topic, qos=0):
assert self.cb is not None, "Subscribe callback is not set"
topic = topic.encode('utf8')
pkt_len = 2 + 2 + len(topic) + 1 # packet identifier + len of topic (16 bits) + topic len + QOS
self.pid += 1
pkt = bytearray([0x82])
pkt.extend(self.__encode_varlen_length(pkt_len)) # len of the remaining
pkt.extend(self.__encode_16(self.pid))
pkt.extend(self.__pascal_string(topic))
pkt.append(qos)
self.sock.send(pkt)
resp = self.sock.recv(5)
#print(resp)
assert resp[0] == 0x90
assert resp[2] == pkt[2] and resp[3] == pkt[3]
if resp[4] == 0x80:
raise MQTTException(resp[4])
# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
# set by .set_callback() method. Other (internal) MQTT
# messages processed internally.
def wait_msg(self):
res = self.sock.recv(1)
self.sock.setblocking(True)
if res is None or res == b"":
return None
#if res == b"":
# raise OSError(-1)
if res == b"\xd0": # PINGRESP
sz = self.sock.recv(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xf0 != 0x30:
return op
sz = self.__recv_varlen_length()
topic_len = self.sock.recv(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = self.sock.recv(topic_len)
sz -= topic_len + 2
if op & 6:
pid = self.sock.recv(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = self.sock.recv(sz)
self.cb(topic, msg)
if op & 6 == 2:
pkt = bytearray(b"\x40\x02\0\0")
struct.pack_into("!H", pkt, 2, pid)
self.sock.send(pkt)
elif op & 6 == 4:
assert 0
# Checks whether a pending message from server is available.
# If not, returns immediately with None. Otherwise, does
# the same processing as wait_msg.
def check_msg(self):
self.sock.setblocking(False)
return self.wait_msg()