diff --git a/bumble/at.py b/bumble/at.py new file mode 100644 index 00000000..78a4b086 --- /dev/null +++ b/bumble/at.py @@ -0,0 +1,85 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Union + + +def tokenize_parameters(buffer: bytes) -> List[bytes]: + """Split input parameters into tokens. + Removes space characters outside of double quote blocks: + T-rec-V-25 - 5.2.1 Command line general format: "Space characters (IA5 2/0) + are ignored [..], unless they are embedded in numeric or string constants" + Raises ValueError in case of invalid input string.""" + + tokens = [] + in_quotes = False + token = bytearray() + for b in buffer: + char = bytearray([b]) + + if in_quotes: + token.extend(char) + if char == b'\"': + in_quotes = False + tokens.append(token[1:-1]) + token = bytearray() + else: + if char == b' ': + pass + elif char == b',' or char == b')': + tokens.append(token) + tokens.append(char) + token = bytearray() + elif char == b'(': + if len(token) > 0: + raise ValueError("open_paren following regular character") + tokens.append(char) + elif char == b'"': + if len(token) > 0: + raise ValueError("quote following regular character") + in_quotes = True + token.extend(char) + else: + token.extend(char) + + tokens.append(token) + return [bytes(token) for token in tokens if len(token) > 0] + + +def parse_parameters(buffer: bytes) -> List[Union[bytes, list]]: + """Parse the parameters using the comma and parenthesis separators. + Raises ValueError in case of invalid input string.""" + + tokens = tokenize_parameters(buffer) + accumulator: List[list] = [[]] + current: Union[bytes, list] = bytes() + + for token in tokens: + if token == b',': + accumulator[-1].append(current) + current = bytes() + elif token == b'(': + accumulator.append([]) + elif token == b')': + if len(accumulator) < 2: + raise ValueError("close_paren without matching open_paren") + accumulator[-1].append(current) + current = accumulator.pop() + else: + current = token + + accumulator[-1].append(current) + if len(accumulator) > 1: + raise ValueError("missing close_paren") + return accumulator[0] diff --git a/bumble/hfp.py b/bumble/hfp.py index 9080a55b..6d9e4282 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -1,4 +1,4 @@ -# Copyright 2021-2022 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,11 +17,31 @@ # ----------------------------------------------------------------------------- import logging import asyncio -import collections -from typing import Union +import dataclasses +import enum +import traceback +from typing import Dict, List, Union, Set +from . import at from . import rfcomm -from .colors import color + +from bumble.core import ( + ProtocolError, + BT_GENERIC_AUDIO_SERVICE, + BT_HANDSFREE_SERVICE, + BT_L2CAP_PROTOCOL_ID, + BT_RFCOMM_PROTOCOL_ID, +) +from bumble.sdp import ( + DataElement, + ServiceAttribute, + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, +) + # ----------------------------------------------------------------------------- # Logging @@ -30,72 +50,700 @@ # ----------------------------------------------------------------------------- -# Protocol Support +# Normative protocol definitions # ----------------------------------------------------------------------------- + +# HF supported features (AT+BRSF=) (normative). +# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 +# and 3GPP 27.007 +class HfFeature(enum.IntFlag): + EC_NR = 0x001 # Echo Cancel & Noise reduction + THREE_WAY_CALLING = 0x002 + CLI_PRESENTATION_CAPABILITY = 0x004 + VOICE_RECOGNITION_ACTIVATION = 0x008 + REMOTE_VOLUME_CONTROL = 0x010 + ENHANCED_CALL_STATUS = 0x020 + ENHANCED_CALL_CONTROL = 0x040 + CODEC_NEGOTIATION = 0x080 + HF_INDICATORS = 0x100 + ESCO_S4_SETTINGS_SUPPORTED = 0x200 + ENHANCED_VOICE_RECOGNITION_STATUS = 0x400 + VOICE_RECOGNITION_TEST = 0x800 + + +# AG supported features (+BRSF:) (normative). +# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 +# and 3GPP 27.007 +class AgFeature(enum.IntFlag): + THREE_WAY_CALLING = 0x001 + EC_NR = 0x002 # Echo Cancel & Noise reduction + VOICE_RECOGNITION_FUNCTION = 0x004 + IN_BAND_RING_TONE_CAPABILITY = 0x008 + VOICE_TAG = 0x010 # Attach a number to voice tag + REJECT_CALL = 0x020 # Ability to reject a call + ENHANCED_CALL_STATUS = 0x040 + ENHANCED_CALL_CONTROL = 0x080 + EXTENDED_ERROR_RESULT_CODES = 0x100 + CODEC_NEGOTIATION = 0x200 + HF_INDICATORS = 0x400 + ESCO_S4_SETTINGS_SUPPORTED = 0x800 + ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000 + VOICE_RECOGNITION_TEST = 0x2000 + + +# Audio Codec IDs (normative). +# Hands-Free Profile v1.8, 10 Appendix B +class AudioCodec(enum.IntEnum): + CVSD = 0x01 # Support for CVSD audio codec + MSBC = 0x02 # Support for mSBC audio codec + + +# HF Indicators (normative). +# Bluetooth Assigned Numbers, 6.10.1 HF Indicators +class HfIndicator(enum.IntEnum): + ENHANCED_SAFETY = 0x01 # Enhanced safety feature + BATTERY_LEVEL = 0x02 # Battery level feature + + +# Call Hold supported operations (normative). +# AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services +class CallHoldOperation(enum.IntEnum): + RELEASE_ALL_HELD_CALLS = 0 # Release all held calls + RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other + HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other + ADD_HELD_CALL = 3 # Adds a held call to conversation + + +# Response Hold status (normative). +# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 +# and 3GPP 27.007 +class ResponseHoldStatus(enum.IntEnum): + INC_CALL_HELD = 0 # Put incoming call on hold + HELD_CALL_ACC = 1 # Accept a held incoming call + HELD_CALL_REJ = 2 # Reject a held incoming call + + +# Values for the Call Setup AG indicator (normative). +# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 +# and 3GPP 27.007 +class CallSetupAgIndicator(enum.IntEnum): + NOT_IN_CALL_SETUP = 0 + INCOMING_CALL_PROCESS = 1 + OUTGOING_CALL_SETUP = 2 + REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call + + +# Values for the Call Held AG indicator (normative). +# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 +# and 3GPP 27.007 +class CallHeldAgIndicator(enum.IntEnum): + NO_CALLS_HELD = 0 + # Call is placed on hold or active/held calls swapped + # (The AG has both an active AND a held call) + CALL_ON_HOLD_AND_ACTIVE_CALL = 1 + CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call + + +# Call Info direction (normative). +# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls +class CallInfoDirection(enum.IntEnum): + MOBILE_ORIGINATED_CALL = 0 + MOBILE_TERMINATED_CALL = 1 + + +# Call Info status (normative). +# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls +class CallInfoStatus(enum.IntEnum): + ACTIVE = 0 + HELD = 1 + DIALING = 2 + ALERTING = 3 + INCOMING = 4 + WAITING = 5 + + +# Call Info mode (normative). +# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls +class CallInfoMode(enum.IntEnum): + VOICE = 0 + DATA = 1 + FAX = 2 + UNKNOWN = 9 + + +# ----------------------------------------------------------------------------- +# Hands-Free Control Interoperability Requirements # ----------------------------------------------------------------------------- -class HfpProtocol: + +# Response codes. +RESPONSE_CODES = [ + "+APLSIRI", + "+BAC", + "+BCC", + "+BCS", + "+BIA", + "+BIEV", + "+BIND", + "+BINP", + "+BLDN", + "+BRSF", + "+BTRH", + "+BVRA", + "+CCWA", + "+CHLD", + "+CHUP", + "+CIND", + "+CLCC", + "+CLIP", + "+CMEE", + "+CMER", + "+CNUM", + "+COPS", + "+IPHONEACCEV", + "+NREC", + "+VGM", + "+VGS", + "+VTS", + "+XAPL", + "A", + "D", +] + +# Unsolicited responses and statuses. +UNSOLICITED_CODES = [ + "+APLSIRI", + "+BCS", + "+BIND", + "+BSIR", + "+BTRH", + "+BVRA", + "+CCWA", + "+CIEV", + "+CLIP", + "+VGM", + "+VGS", + "BLACKLISTED", + "BUSY", + "DELAYED", + "NO ANSWER", + "NO CARRIER", + "RING", +] + +# Status codes +STATUS_CODES = [ + "+CME ERROR", + "BLACKLISTED", + "BUSY", + "DELAYED", + "ERROR", + "NO ANSWER", + "NO CARRIER", + "OK", +] + + +@dataclasses.dataclass +class Configuration: + supported_hf_features: List[HfFeature] + supported_hf_indicators: List[HfIndicator] + supported_audio_codecs: List[AudioCodec] + + +class AtResponseType(enum.Enum): + """Indicate if a response is expected from an AT command, and if multiple + responses are accepted.""" + + NONE = 0 + SINGLE = 1 + MULTIPLE = 2 + + +class AtResponse: + code: str + parameters: list + + def __init__(self, response: bytearray): + code_and_parameters = response.split(b':') + parameters = ( + code_and_parameters[1] if len(code_and_parameters) > 1 else bytearray() + ) + self.code = code_and_parameters[0].decode() + self.parameters = at.parse_parameters(parameters) + + +@dataclasses.dataclass +class AgIndicatorState: + description: str + index: int + supported_values: Set[int] + current_status: int + + +@dataclasses.dataclass +class HfIndicatorState: + supported: bool = False + enabled: bool = False + + +class HfProtocol: + """Implementation for the Hands-Free side of the Hands-Free profile. + Reference specification Hands-Free Profile v1.8""" + + supported_hf_features: int + supported_audio_codecs: List[AudioCodec] + + supported_ag_features: int + supported_ag_call_hold_operations: List[CallHoldOperation] + + ag_indicators: List[AgIndicatorState] + hf_indicators: Dict[HfIndicator, HfIndicatorState] + dlc: rfcomm.DLC - buffer: str - lines: collections.deque - lines_available: asyncio.Event + command_lock: asyncio.Lock + response_queue: asyncio.Queue + unsolicited_queue: asyncio.Queue + read_buffer: bytearray - def __init__(self, dlc: rfcomm.DLC) -> None: + def __init__(self, dlc: rfcomm.DLC, configuration: Configuration): + # Configure internal state. self.dlc = dlc - self.buffer = '' - self.lines = collections.deque() - self.lines_available = asyncio.Event() - - dlc.sink = self.feed - - def feed(self, data: Union[bytes, str]) -> None: - # Convert the data to a string if needed - if isinstance(data, bytes): - data = data.decode('utf-8') - - logger.debug(f'<<< Data received: {data}') - - # Add to the buffer and look for lines - self.buffer += data - while (separator := self.buffer.find('\r')) >= 0: - line = self.buffer[:separator].strip() - self.buffer = self.buffer[separator + 1 :] - if len(line) > 0: - self.on_line(line) - - def on_line(self, line: str) -> None: - self.lines.append(line) - self.lines_available.set() - - def send_command_line(self, line: str) -> None: - logger.debug(color(f'>>> {line}', 'yellow')) - self.dlc.write(line + '\r') - - def send_response_line(self, line: str) -> None: - logger.debug(color(f'>>> {line}', 'yellow')) - self.dlc.write('\r\n' + line + '\r\n') - - async def next_line(self) -> str: - await self.lines_available.wait() - line = self.lines.popleft() - if not self.lines: - self.lines_available.clear() - logger.debug(color(f'<<< {line}', 'green')) - return line - - async def initialize_service(self) -> None: - # Perform Service Level Connection Initialization - self.send_command_line('AT+BRSF=2072') # Retrieve Supported Features - await (self.next_line()) - await (self.next_line()) - - self.send_command_line('AT+CIND=?') - await (self.next_line()) - await (self.next_line()) - - self.send_command_line('AT+CIND?') - await (self.next_line()) - await (self.next_line()) - - self.send_command_line('AT+CMER=3,0,0,1') - await (self.next_line()) + self.command_lock = asyncio.Lock() + self.response_queue = asyncio.Queue() + self.unsolicited_queue = asyncio.Queue() + self.read_buffer = bytearray() + + # Build local features. + self.supported_hf_features = sum(configuration.supported_hf_features) + self.supported_audio_codecs = configuration.supported_audio_codecs + + self.hf_indicators = { + indicator: HfIndicatorState() + for indicator in configuration.supported_hf_indicators + } + + # Clear remote features. + self.supported_ag_features = 0 + self.supported_ag_call_hold_operations = [] + self.ag_indicators = [] + + # Bind the AT reader to the RFCOMM channel. + self.dlc.sink = self._read_at + + def supports_hf_feature(self, feature: HfFeature) -> bool: + return (self.supported_hf_features & feature) != 0 + + def supports_ag_feature(self, feature: AgFeature) -> bool: + return (self.supported_ag_features & feature) != 0 + + # Read AT messages from the RFCOMM channel. + # Enqueue AT commands, responses, unsolicited responses to their + # respective queues, and set the corresponding event. + def _read_at(self, data: bytes): + # Append to the read buffer. + self.read_buffer.extend(data) + + # Locate header and trailer. + header = self.read_buffer.find(b'\r\n') + trailer = self.read_buffer.find(b'\r\n', header + 2) + if header == -1 or trailer == -1: + return + + # Isolate the AT response code and parameters. + raw_response = self.read_buffer[header + 2 : trailer] + response = AtResponse(raw_response) + logger.debug(f"<<< {raw_response.decode()}") + + # Consume the response bytes. + self.read_buffer = self.read_buffer[trailer + 2 :] + + # Forward the received code to the correct queue. + if self.command_lock.locked() and ( + response.code in STATUS_CODES or response.code in RESPONSE_CODES + ): + self.response_queue.put_nowait(response) + elif response.code in UNSOLICITED_CODES: + self.unsolicited_queue.put_nowait(response) + else: + logger.warning(f"dropping unexpected response with code '{response.code}'") + + # Send an AT command and wait for the peer resposne. + # Wait for the AT responses sent by the peer, to the status code. + # Raises asyncio.TimeoutError if the status is not received + # after a timeout (default 1 second). + # Raises ProtocolError if the status is not OK. + async def execute_command( + self, + cmd: str, + timeout: float = 1.0, + response_type: AtResponseType = AtResponseType.NONE, + ) -> Union[None, AtResponse, List[AtResponse]]: + async with self.command_lock: + logger.debug(f">>> {cmd}") + self.dlc.write(cmd + '\r') + responses: List[AtResponse] = [] + + while True: + result = await asyncio.wait_for( + self.response_queue.get(), timeout=timeout + ) + if result.code == 'OK': + if response_type == AtResponseType.SINGLE and len(responses) != 1: + raise ProtocolError("NO ANSWER") + + if response_type == AtResponseType.MULTIPLE: + return responses + if response_type == AtResponseType.SINGLE: + return responses[0] + return None + if result.code in STATUS_CODES: + raise ProtocolError(result.code) + responses.append(result) + + # 4.2.1 Service Level Connection Initialization. + async def initiate_slc(self): + # 4.2.1.1 Supported features exchange + # First, in the initialization procedure, the HF shall send the + # AT+BRSF= command to the AG to both notify + # the AG of the supported features in the HF, as well as to retrieve the + # supported features in the AG using the +BRSF result code. + response = await self.execute_command( + f"AT+BRSF={self.supported_hf_features}", response_type=AtResponseType.SINGLE + ) + + self.supported_ag_features = int(response.parameters[0]) + logger.info(f"supported AG features: {self.supported_ag_features}") + for feature in AgFeature: + if self.supports_ag_feature(feature): + logger.info(f" - {feature.name}") + + # 4.2.1.2 Codec Negotiation + # Secondly, in the initialization procedure, if the HF supports the + # Codec Negotiation feature, it shall check if the AT+BRSF command + # response from the AG has indicated that it supports the Codec + # Negotiation feature. + if self.supports_hf_feature( + HfFeature.CODEC_NEGOTIATION + ) and self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION): + # If both the HF and AG do support the Codec Negotiation feature + # then the HF shall send the AT+BAC= command to + # the AG to notify the AG of the available codecs in the HF. + codecs = [str(c) for c in self.supported_audio_codecs] + await self.execute_command(f"AT+BAC={','.join(codecs)}") + + # 4.2.1.3 AG Indicators + # After having retrieved the supported features in the AG, the HF shall + # determine which indicators are supported by the AG, as well as the + # ordering of the supported indicators. This is because, according to + # the 3GPP 27.007 specification [2], the AG may support additional + # indicators not provided for by the Hands-Free Profile, and because the + # ordering of the indicators is implementation specific. The HF uses + # the AT+CIND=? Test command to retrieve information about the supported + # indicators and their ordering. + response = await self.execute_command( + "AT+CIND=?", response_type=AtResponseType.SINGLE + ) + + self.ag_indicators = [] + for index, indicator in enumerate(response.parameters): + description = indicator[0].decode() + supported_values = [] + for value in indicator[1]: + value = value.split(b'-') + value = [int(v) for v in value] + value_min = value[0] + value_max = value[1] if len(value) > 1 else value[0] + supported_values.extend([v for v in range(value_min, value_max + 1)]) + + self.ag_indicators.append( + AgIndicatorState(description, index, set(supported_values), 0) + ) + + # Once the HF has the necessary supported indicator and ordering + # information, it shall retrieve the current status of the indicators + # in the AG using the AT+CIND? Read command. + response = await self.execute_command( + "AT+CIND?", response_type=AtResponseType.SINGLE + ) + + for index, indicator in enumerate(response.parameters): + self.ag_indicators[index].current_status = int(indicator) + + # After having retrieved the status of the indicators in the AG, the HF + # shall then enable the "Indicators status update" function in the AG by + # issuing the AT+CMER command, to which the AG shall respond with OK. + await self.execute_command("AT+CMER=3,,,1") + + if self.supports_hf_feature( + HfFeature.THREE_WAY_CALLING + ) and self.supports_ag_feature(HfFeature.THREE_WAY_CALLING): + # After the HF has enabled the “Indicators status update” function in + # the AG, and if the “Call waiting and 3-way calling” bit was set in the + # supported features bitmap by both the HF and the AG, the HF shall + # issue the AT+CHLD=? test command to retrieve the information about how + # the call hold and multiparty services are supported in the AG. The HF + # shall not issue the AT+CHLD=? test command in case either the HF or + # the AG does not support the "Three-way calling" feature. + response = await self.execute_command( + "AT+CHLD=?", response_type=AtResponseType.SINGLE + ) + + self.supported_ag_call_hold_operations = [ + CallHoldOperation(int(operation)) + for operation in response.parameters[0] + if not b'x' in operation + ] + + # 4.2.1.4 HF Indicators + # If the HF supports the HF indicator feature, it shall check the +BRSF + # response to see if the AG also supports the HF Indicator feature. + if self.supports_hf_feature( + HfFeature.HF_INDICATORS + ) and self.supports_ag_feature(AgFeature.HF_INDICATORS): + # If both the HF and AG support the HF Indicator feature, then the HF + # shall send the AT+BIND= command to the AG + # to notify the AG of the supported indicators’ assigned numbers in the + # HF. The AG shall respond with OK + indicators = [str(i) for i in self.hf_indicators.keys()] + await self.execute_command(f"AT+BIND={','.join(indicators)}") + + # After having provided the AG with the HF indicators it supports, + # the HF shall send the AT+BIND=? to request HF indicators supported + # by the AG. The AG shall reply with the +BIND response listing all + # HF indicators that it supports followed by an OK. + response = await self.execute_command( + "AT+BIND=?", response_type=AtResponseType.SINGLE + ) + + logger.info("supported HF indicators:") + for indicator in response.parameters[0]: + indicator = HfIndicator(int(indicator)) + logger.info(f" - {indicator.name}") + if indicator in self.hf_indicators: + self.hf_indicators[indicator].supported = True + + # Once the HF receives the supported HF indicators list from the AG, + # the HF shall send the AT+BIND? command to determine which HF + # indicators are enabled. The AG shall respond with one or more + # +BIND responses. The AG shall terminate the list with OK. + # (See Section 4.36.1.3). + responses = await self.execute_command( + "AT+BIND?", response_type=AtResponseType.MULTIPLE + ) + + logger.info("enabled HF indicators:") + for response in responses: + indicator = HfIndicator(int(response.parameters[0])) + enabled = int(response.parameters[1]) != 0 + logger.info(f" - {indicator.name}: {enabled}") + if indicator in self.hf_indicators: + self.hf_indicators[indicator].enabled = True + + logger.info("SLC setup completed") + + # 4.11.2 Audio Connection Setup by HF + async def setup_audio_connection(self): + # When the HF triggers the establishment of the Codec Connection it + # shall send the AT command AT+BCC to the AG. The AG shall respond with + # OK if it will start the Codec Connection procedure, and with ERROR + # if it cannot start the Codec Connection procedure. + await self.execute_command("AT+BCC") + + # 4.11.3 Codec Connection Setup + async def setup_codec_connection(self, codec_id: int): + # The AG shall send a +BCS= unsolicited response to the HF. + # The HF shall then respond to the incoming unsolicited response with + # the AT command AT+BCS=. The ID shall be the same as in the + # unsolicited response code as long as the ID is supported. + # If the received ID is not available, the HF shall respond with + # AT+BAC with its available codecs. + if codec_id not in self.supported_audio_codecs: + codecs = [str(c) for c in self.supported_audio_codecs] + await self.execute_command(f"AT+BAC={','.join(codecs)}") + return + + await self.execute_command(f"AT+BCS={codec_id}") + + # After sending the OK response, the AG shall open the + # Synchronous Connection with the settings that are determined by the + # ID. The HF shall be ready to accept the synchronous connection + # establishment as soon as it has sent the AT commands AT+BCS=. + + logger.info("codec connection setup completed") + + # 4.13.1 Answer Incoming Call from the HF – In-Band Ringing + async def answer_incoming_call(self): + # The user accepts the incoming voice call by using the proper means + # provided by the HF. The HF shall then send the ATA command + # (see Section 4.34) to the AG. The AG shall then begin the procedure for + # accepting the incoming call. + await self.execute_command("ATA") + + # 4.14.1 Reject an Incoming Call from the HF + async def reject_incoming_call(self): + # The user rejects the incoming call by using the User Interface on the + # Hands-Free unit. The HF shall then send the AT+CHUP command + # (see Section 4.34) to the AG. This may happen at any time during the + # procedures described in Sections 4.13.1 and 4.13.2. + await self.execute_command("AT+CHUP") + + # 4.15.1 Terminate a Call Process from the HF + async def terminate_call(self): + # The user may abort the ongoing call process using whatever means + # provided by the Hands-Free unit. The HF shall send AT+CHUP command + # (see Section 4.34) to the AG, and the AG shall then start the + # procedure to terminate or interrupt the current call procedure. + # The AG shall then send the OK indication followed by the +CIEV result + # code, with the value indicating (call=0). + await self.execute_command("AT+CHUP") + + async def update_ag_indicator(self, index: int, value: int): + self.ag_indicators[index].current_status = value + logger.info( + f"AG indicator updated: {self.ag_indicators[index].description}, {value}" + ) + + async def handle_unsolicited(self): + """Handle unsolicited result codes sent by the audio gateway.""" + result = await self.unsolicited_queue.get() + if result.code == "+BCS": + await self.setup_codec_connection(int(result.parameters[0])) + elif result.code == "+CIEV": + await self.update_ag_indicator( + int(result.parameters[0]), int(result.parameters[1]) + ) + else: + logging.info(f"unhandled unsolicited response {result.code}") + + async def run(self): + """Main rountine for the Hands-Free side of the HFP protocol. + Initiates the service level connection then loops handling + unsolicited AG responses.""" + + try: + await self.initiate_slc() + while True: + await self.handle_unsolicited() + except Exception: + logger.error("HFP-HF protocol failed with the following error:") + logger.error(traceback.format_exc()) + + +# ----------------------------------------------------------------------------- +# Normative SDP definitions +# ----------------------------------------------------------------------------- + + +# Profile version (normative). +# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements +class ProfileVersion(enum.IntEnum): + V1_5 = 0x0105 + V1_6 = 0x0106 + V1_7 = 0x0107 + V1_8 = 0x0108 + V1_9 = 0x0109 + + +# HF supported features (normative). +# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements +class HfSdpFeature(enum.IntFlag): + EC_NR = 0x01 # Echo Cancel & Noise reduction + THREE_WAY_CALLING = 0x02 + CLI_PRESENTATION_CAPABILITY = 0x04 + VOICE_RECOGNITION_ACTIVATION = 0x08 + REMOTE_VOLUME_CONTROL = 0x10 + WIDE_BAND = 0x20 # Wide band speech + ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 + VOICE_RECOGNITION_TEST = 0x80 + + +# AG supported features (normative). +# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements +class AgSdpFeature(enum.IntFlag): + THREE_WAY_CALLING = 0x01 + EC_NR = 0x02 # Echo Cancel & Noise reduction + VOICE_RECOGNITION_FUNCTION = 0x04 + IN_BAND_RING_TONE_CAPABILITY = 0x08 + VOICE_TAG = 0x10 # Attach a number to voice tag + WIDE_BAND = 0x20 # Wide band speech + ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 + VOICE_RECOGNITION_TEST = 0x80 + + +def sdp_records( + service_record_handle: int, rfcomm_channel: int, configuration: Configuration +) -> List[ServiceAttribute]: + """Generate the SDP record for HFP Hands-Free support. + The record exposes the features supported in the input configuration, + and the allocated RFCOMM channel.""" + + hf_supported_features = 0 + + if HfFeature.EC_NR in configuration.supported_hf_features: + hf_supported_features |= HfSdpFeature.EC_NR + if HfFeature.THREE_WAY_CALLING in configuration.supported_hf_features: + hf_supported_features |= HfSdpFeature.THREE_WAY_CALLING + if HfFeature.CLI_PRESENTATION_CAPABILITY in configuration.supported_hf_features: + hf_supported_features |= HfSdpFeature.CLI_PRESENTATION_CAPABILITY + if HfFeature.VOICE_RECOGNITION_ACTIVATION in configuration.supported_hf_features: + hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_ACTIVATION + if HfFeature.REMOTE_VOLUME_CONTROL in configuration.supported_hf_features: + hf_supported_features |= HfSdpFeature.REMOTE_VOLUME_CONTROL + if ( + HfFeature.ENHANCED_VOICE_RECOGNITION_STATUS + in configuration.supported_hf_features + ): + hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS + if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features: + hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST + + if AudioCodec.MSBC in configuration.supported_audio_codecs: + hf_supported_features |= HfSdpFeature.WIDE_BAND + + return [ + ServiceAttribute( + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(service_record_handle), + ), + ServiceAttribute( + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.uuid(BT_HANDSFREE_SERVICE), + DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), + ] + ), + ), + ServiceAttribute( + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), + DataElement.sequence( + [ + DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), + DataElement.unsigned_integer_8(rfcomm_channel), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(BT_HANDSFREE_SERVICE), + DataElement.unsigned_integer_16(ProfileVersion.V1_8), + ] + ) + ] + ), + ), + ServiceAttribute( + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(hf_supported_features), + ), + ] diff --git a/bumble/sdp.py b/bumble/sdp.py index 019b8e6f..1d4faf9a 100644 --- a/bumble/sdp.py +++ b/bumble/sdp.py @@ -94,6 +94,10 @@ SDP_ICON_URL_ATTRIBUTE_ID = 0X000C SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0X000D +# Attribute Identifier (cf. Assigned Numbers for Service Discovery) +# used by AVRCP, HFP and A2DP +SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID = 0x0311 + SDP_ATTRIBUTE_ID_NAMES = { SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID: 'SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID', SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID: 'SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID', diff --git a/examples/run_hfp_gateway.py b/examples/run_hfp_gateway.py index 63a2a7c3..eac54737 100644 --- a/examples/run_hfp_gateway.py +++ b/examples/run_hfp_gateway.py @@ -16,9 +16,11 @@ # Imports # ----------------------------------------------------------------------------- import asyncio +import collections import sys import os import logging +from typing import Union from bumble.colors import color @@ -30,6 +32,7 @@ BT_RFCOMM_PROTOCOL_ID, BT_BR_EDR_TRANSPORT, ) +from bumble import rfcomm from bumble.rfcomm import Client from bumble.sdp import ( Client as SDP_Client, @@ -39,7 +42,64 @@ SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, ) -from bumble.hfp import HfpProtocol + + +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Protocol Support +# ----------------------------------------------------------------------------- + +# ----------------------------------------------------------------------------- +class HfpProtocol: + dlc: rfcomm.DLC + buffer: str + lines: collections.deque + lines_available: asyncio.Event + + def __init__(self, dlc: rfcomm.DLC) -> None: + self.dlc = dlc + self.buffer = '' + self.lines = collections.deque() + self.lines_available = asyncio.Event() + + dlc.sink = self.feed + + def feed(self, data: Union[bytes, str]) -> None: + # Convert the data to a string if needed + if isinstance(data, bytes): + data = data.decode('utf-8') + + logger.debug(f'<<< Data received: {data}') + + # Add to the buffer and look for lines + self.buffer += data + while (separator := self.buffer.find('\r')) >= 0: + line = self.buffer[:separator].strip() + self.buffer = self.buffer[separator + 1 :] + if len(line) > 0: + self.on_line(line) + + def on_line(self, line: str) -> None: + self.lines.append(line) + self.lines_available.set() + + def send_command_line(self, line: str) -> None: + logger.debug(color(f'>>> {line}', 'yellow')) + self.dlc.write(line + '\r') + + def send_response_line(self, line: str) -> None: + logger.debug(color(f'>>> {line}', 'yellow')) + self.dlc.write('\r\n' + line + '\r\n') + + async def next_line(self) -> str: + await self.lines_available.wait() + line = self.lines.popleft() + if not self.lines: + self.lines_available.clear() + logger.debug(color(f'<<< {line}', 'green')) + return line # ----------------------------------------------------------------------------- diff --git a/examples/run_hfp_handsfree.py b/examples/run_hfp_handsfree.py index cef29c0d..5f747fcf 100644 --- a/examples/run_hfp_handsfree.py +++ b/examples/run_hfp_handsfree.py @@ -21,82 +21,22 @@ import logging import json import websockets - +from typing import Optional from bumble.device import Device from bumble.transport import open_transport_or_link from bumble.rfcomm import Server as RfcommServer -from bumble.sdp import ( - DataElement, - ServiceAttribute, - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, -) -from bumble.core import ( - BT_GENERIC_AUDIO_SERVICE, - BT_HANDSFREE_SERVICE, - BT_L2CAP_PROTOCOL_ID, - BT_RFCOMM_PROTOCOL_ID, -) -from bumble.hfp import HfpProtocol - - -# ----------------------------------------------------------------------------- -def make_sdp_records(rfcomm_channel): - return { - 0x00010001: [ - ServiceAttribute( - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - DataElement.unsigned_integer_32(0x00010001), - ), - ServiceAttribute( - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.uuid(BT_HANDSFREE_SERVICE), - DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), - ] - ), - ), - ServiceAttribute( - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), - DataElement.sequence( - [ - DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), - DataElement.unsigned_integer_8(rfcomm_channel), - ] - ), - ] - ), - ), - ServiceAttribute( - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.sequence( - [ - DataElement.uuid(BT_HANDSFREE_SERVICE), - DataElement.unsigned_integer_16(0x0105), - ] - ) - ] - ), - ), - ] - } +from bumble import hfp +from bumble.hfp import HfProtocol # ----------------------------------------------------------------------------- class UiServer: - protocol = None + protocol: Optional[HfProtocol] = None async def start(self): - # Start a Websocket server to receive events from a web page + """Start a Websocket server to receive events from a web page.""" + async def serve(websocket, _path): while True: try: @@ -107,7 +47,7 @@ async def serve(websocket, _path): message_type = parsed['type'] if message_type == 'at_command': if self.protocol is not None: - self.protocol.send_command_line(parsed['command']) + await self.protocol.execute_command(parsed['command']) except websockets.exceptions.ConnectionClosedOK: pass @@ -117,19 +57,11 @@ async def serve(websocket, _path): # ----------------------------------------------------------------------------- -async def protocol_loop(protocol): - await protocol.initialize_service() - - while True: - await (protocol.next_line()) - - -# ----------------------------------------------------------------------------- -def on_dlc(dlc): +def on_dlc(dlc, configuration: hfp.Configuration): print('*** DLC connected', dlc) - protocol = HfpProtocol(dlc) + protocol = HfProtocol(dlc, configuration) UiServer.protocol = protocol - asyncio.create_task(protocol_loop(protocol)) + asyncio.create_task(protocol.run()) # ----------------------------------------------------------------------------- @@ -143,6 +75,27 @@ async def main(): async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): print('<<< connected') + # Hands-Free profile configuration. + # TODO: load configuration from file. + configuration = hfp.Configuration( + supported_hf_features=[ + hfp.HfFeature.THREE_WAY_CALLING, + hfp.HfFeature.REMOTE_VOLUME_CONTROL, + hfp.HfFeature.ENHANCED_CALL_STATUS, + hfp.HfFeature.ENHANCED_CALL_CONTROL, + hfp.HfFeature.CODEC_NEGOTIATION, + hfp.HfFeature.HF_INDICATORS, + hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED, + ], + supported_hf_indicators=[ + hfp.HfIndicator.BATTERY_LEVEL, + ], + supported_audio_codecs=[ + hfp.AudioCodec.CVSD, + hfp.AudioCodec.MSBC, + ], + ) + # Create a device device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device.classic_enabled = True @@ -151,11 +104,13 @@ async def main(): rfcomm_server = RfcommServer(device) # Listen for incoming DLC connections - channel_number = rfcomm_server.listen(on_dlc) + channel_number = rfcomm_server.listen(lambda dlc: on_dlc(dlc, configuration)) print(f'### Listening for connection on channel {channel_number}') # Advertise the HFP RFComm channel in the SDP - device.sdp_service_records = make_sdp_records(channel_number) + device.sdp_service_records = { + 0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration) + } # Let's go! await device.power_on() diff --git a/tests/at_test.py b/tests/at_test.py new file mode 100644 index 00000000..a0f00dd9 --- /dev/null +++ b/tests/at_test.py @@ -0,0 +1,35 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bumble import at + + +def test_tokenize_parameters(): + assert at.tokenize_parameters(b'1, 2, 3') == [b'1', b',', b'2', b',', b'3'] + assert at.tokenize_parameters(b'"1, 2, 3"') == [b'1, 2, 3'] + assert at.tokenize_parameters(b'(1, "2, 3")') == [b'(', b'1', b',', b'2, 3', b')'] + + +def test_parse_parameters(): + assert at.parse_parameters(b'1, 2, 3') == [b'1', b'2', b'3'] + assert at.parse_parameters(b'1,, 3') == [b'1', b'', b'3'] + assert at.parse_parameters(b'"1, 2, 3"') == [b'1, 2, 3'] + assert at.parse_parameters(b'1, (2, (3))') == [b'1', [b'2', [b'3']]] + assert at.parse_parameters(b'1, (2, "3, 4"), 5') == [b'1', [b'2', b'3, 4'], b'5'] + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + test_tokenize_parameters() + test_parse_parameters()