blob: 8db1a56026b626c86bb0a19d3be83c174755054e [file] [log] [blame]
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import grpc
import logging
import struct
from bumble.core import AdvertisingData
from bumble.device import Connection, Connection as BumbleConnection, Device
from bumble.gatt import (
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
GATT_ASHA_SERVICE,
GATT_ASHA_VOLUME_CHARACTERISTIC,
Characteristic,
CharacteristicValue,
TemplateService,
)
from bumble.l2cap import Channel
from bumble.pandora import utils
from bumble.utils import AsyncRunner
from google.protobuf.empty_pb2 import Empty # pytype: disable=pyi-error
from pandora_experimental.asha_grpc_aio import AshaServicer
from pandora_experimental.asha_pb2 import CaptureAudioRequest, CaptureAudioResponse, RegisterRequest
from typing import AsyncGenerator, List, Optional
class AshaGattService(TemplateService):
# TODO: update bumble and remove this when complete
UUID = GATT_ASHA_SERVICE
OPCODE_START = 1
OPCODE_STOP = 2
OPCODE_STATUS = 3
PROTOCOL_VERSION = 0x01
RESERVED_FOR_FUTURE_USE = [00, 00]
FEATURE_MAP = [0x01] # [LE CoC audio output streaming supported]
SUPPORTED_CODEC_ID = [0x02, 0x01] # Codec IDs [G.722 at 16 kHz]
RENDER_DELAY = [00, 00]
def __init__(self, capability: int, hisyncid: List[int], device: Device, psm: int = 0) -> None:
self.hisyncid = hisyncid
self.capability = capability # Device Capabilities [Left, Monaural]
self.device = device
self.audio_out_data = b""
self.psm: int = psm # a non-zero psm is mainly for testing purpose
logger = logging.getLogger(__name__)
# Handler for volume control
def on_volume_write(connection: Connection, value: bytes) -> None:
logger.info(f"--- VOLUME Write:{value[0]}")
self.emit("volume", connection, value[0])
# Handler for audio control commands
def on_audio_control_point_write(connection: Connection, value: bytes) -> None:
logger.info(f"type {type(value)}")
logger.info(f"--- AUDIO CONTROL POINT Write:{value.hex()}")
opcode = value[0]
if opcode == AshaGattService.OPCODE_START:
# Start
audio_type = ("Unknown", "Ringtone", "Phone Call", "Media")[value[2]]
logger.info(
f"### START: codec={value[1]}, "
f"audio_type={audio_type}, "
f"volume={value[3]}, "
f"otherstate={value[4]}"
)
self.emit(
"start",
connection,
{
"codec": value[1],
"audiotype": value[2],
"volume": value[3],
"otherstate": value[4],
},
)
elif opcode == AshaGattService.OPCODE_STOP:
logger.info("### STOP")
self.emit("stop", connection)
elif opcode == AshaGattService.OPCODE_STATUS:
logger.info(f"### STATUS: connected={value[1]}")
# OPCODE_STATUS does not need audio status point update
if opcode != AshaGattService.OPCODE_STATUS:
AsyncRunner.spawn(device.notify_subscribers(self.audio_status_characteristic, force=True)) # type: ignore[no-untyped-call]
def on_read_only_properties_read(connection: Connection) -> bytes:
value = (
bytes(
[
AshaGattService.PROTOCOL_VERSION, # Version
self.capability,
]
)
+ bytes(self.hisyncid)
+ bytes(AshaGattService.FEATURE_MAP)
+ bytes(AshaGattService.RENDER_DELAY)
+ bytes(AshaGattService.RESERVED_FOR_FUTURE_USE)
+ bytes(AshaGattService.SUPPORTED_CODEC_ID)
)
self.emit("read_only_properties", connection, value)
return value
def on_le_psm_out_read(connection: Connection) -> bytes:
self.emit("le_psm_out", connection, self.psm)
return struct.pack("<H", self.psm)
self.read_only_properties_characteristic = Characteristic(
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
CharacteristicValue(read=on_read_only_properties_read), # type: ignore[no-untyped-call]
)
self.audio_control_point_characteristic = Characteristic(
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write), # type: ignore[no-untyped-call]
)
self.audio_status_characteristic = Characteristic(
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([0]),
)
self.volume_characteristic = Characteristic(
GATT_ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write), # type: ignore[no-untyped-call]
)
# Register an L2CAP CoC server
def on_coc(channel: Channel) -> None:
def on_data(data: bytes) -> None:
logging.debug(f"data received:{data.hex()}")
self.emit("data", channel.connection, data)
self.audio_out_data += data
channel.sink = on_data # type: ignore[no-untyped-call]
# let the server find a free PSM
self.psm = self.device.register_l2cap_channel_server(self.psm, on_coc, 8) # type: ignore[no-untyped-call]
self.le_psm_out_characteristic = Characteristic(
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
CharacteristicValue(read=on_le_psm_out_read), # type: ignore[no-untyped-call]
)
characteristics = [
self.read_only_properties_characteristic,
self.audio_control_point_characteristic,
self.audio_status_characteristic,
self.volume_characteristic,
self.le_psm_out_characteristic,
]
super().__init__(characteristics) # type: ignore[no-untyped-call]
def get_advertising_data(self) -> bytes:
# Advertisement only uses 4 least significant bytes of the HiSyncId.
return bytes(
AdvertisingData(
[
(
AdvertisingData.SERVICE_DATA_16_BIT_UUID,
bytes(GATT_ASHA_SERVICE)
+ bytes(
[
AshaGattService.PROTOCOL_VERSION,
self.capability,
]
)
+ bytes(self.hisyncid[:4]),
),
]
)
)
class AshaService(AshaServicer):
device: Device
asha_service: Optional[AshaGattService]
def __init__(self, device: Device) -> None:
self.log = utils.BumbleServerLoggerAdapter(logging.getLogger(), {"service_name": "Asha", "device": device})
self.device = device
self.asha_service = None
@utils.rpc
async def Register(self, request: RegisterRequest, context: grpc.ServicerContext) -> Empty:
logging.info("Register")
# asha service from bumble profile
self.asha_service = AshaGattService(request.capability, request.hisyncid, self.device)
self.device.add_service(self.asha_service) # type: ignore[no-untyped-call]
return Empty()
@utils.rpc
async def CaptureAudio(
self, request: CaptureAudioRequest, context: grpc.ServicerContext
) -> AsyncGenerator[CaptureAudioResponse, None]:
connection_handle = int.from_bytes(request.connection.cookie.value, "big")
logging.info(f"CaptureAudioData connection_handle:{connection_handle}")
if not (connection := self.device.lookup_connection(connection_handle)):
raise RuntimeError(f"Unknown connection for connection_handle:{connection_handle}")
queue: asyncio.Queue[bytes] = asyncio.Queue()
def on_data(asha_connection: BumbleConnection, data: bytes) -> None:
if asha_connection == connection:
queue.put_nowait(data)
self.asha_service.on("data", on_data) # type: ignore
try:
while data := await queue.get():
yield CaptureAudioResponse(data=data)
finally:
self.asha_service.remove_listener("data", on_data) # type: ignore