import os import logging import io import struct import argparse import yaml from mltk.utils.uart_stream import UartStream from mltk.utils.python import install_pip_package # This is a work-around for the AlexaClient Python package that uses an older Python version import collections.abc collections.Iterable = collections.abc.Iterable collections.Mapping = collections.abc.Mapping collections.MutableSet = collections.abc.MutableSet collections.MutableMapping = collections.abc.MutableMapping # Install the 'alexa_client' Python package: # https://pypi.org/project/alexa-client install_pip_package('alexa_client') from alexa_client.refreshtoken import serve as avs_refreshtoken from alexa_client import AlexaClient from alexa_client.alexa_client import constants as avs_constants # Configure the logger logging.basicConfig(level='INFO', handlers=[logging.StreamHandler()]) logging.getLogger('hpack').setLevel('ERROR') logging.getLogger('hyper').setLevel('ERROR') logger = logging.getLogger('AlexaDemo') def main( client_id:str, secret:str, refresh_token:str, baud:int ): """This is the main entry point of the script""" logger.info('Alexa demo starting ...') alexa_client = AlexaClient( client_id=client_id, secret=secret, refresh_token=refresh_token, base_url=avs_constants.BASE_URL_NORTH_AMERICA ) # Connect to the AVS cloud alexa_client.connect() try: application_loop(alexa_client, baud=baud) except KeyboardInterrupt: pass finally: alexa_client.ping_manager.cancel() alexa_client.connection_manager.connection.close() def application_loop( alexa_client: AlexaClient, baud:int ): """This is the main application loop It retrieves commands from the development board, forwards them to the AVS cloud, and forwards the AVS responses to the development board """ with DevBoard(baud=baud) as board: dialog_request_id = None while True: logger.info('Waiting for an "Alexa" command. (speak into the dev board microphone)') alexa_command = board.wait_for_command() logger.info('Sending "Alexa" command to AVS cloud') directives = alexa_client.send_audio_file( alexa_command, dialog_request_id=dialog_request_id, audio_format=avs_constants.OPUS, distance_profile=avs_constants.CLOSE_TALK ) if not directives: logger.warning('Failed to receive response from AVS') continue dialog_request_id = None for directive in directives: if directive.name == 'ExpectSpeech': logger.info('Expecting a response from user ...') dialog_request_id = directive.dialog_request_id board.start_microphone(at_end_of_speaker_audio=True) elif directive.name in ['Speak', 'Play']: logger.info(f'Received response from AVS cloud: {len(directive.audio_attachment)} bytes') board.play_audio(directive.audio_attachment) class DevBoard: """This is a helper class to communicate with the dev board's UART""" # These commands are also defined in audio_io.cc CMD_START_MICROPHONE = 0 CMD_STOP_MICROPHONE = 1 CMD_START_MICROPHONE_AT_END_OF_SPEAKER_AUDIO = 2 CMD_PLAY_AUDIO = 3 def __init__( self, baud: 115200 ): self._uart = UartStream(baud=baud) def open(self): self._uart.open() def close(self): self._uart.close() def wait_for_command(self) -> io.BytesIO: """Wait and retrieve a command spoken to the dev board's microphone and return the Opus-encoded audio as a binary file object """ data_buffer = io.BytesIO() while True: cmd = self._uart.read_command() if cmd.code == self.CMD_STOP_MICROPHONE: logger.info(f'Command received: {data_buffer.tell()} bytes') self._uart.flush_input() data_buffer.seek(0) return data_buffer data = self._uart.read() if not data: self._uart.wait(0.100) continue if data_buffer.getbuffer().nbytes == 0: logger.info('Receiving command ...') data_buffer.write(data) def play_audio(self, mp3_data:bytes): """Send the MP3-encoded audio to the development board""" self._uart.write_command(self.CMD_PLAY_AUDIO, struct.pack('