Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/viam/app/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1531,6 +1531,7 @@ async def binary_data_capture_upload(
component_name: str,
method_name: str,
file_extension: str,
mime_type: Optional[str] = None,
method_parameters: Optional[Mapping[str, Any]] = None,
tags: Optional[List[str]] = None,
dataset_ids: Optional[List[str]] = None,
Expand Down Expand Up @@ -1568,6 +1569,7 @@ async def binary_data_capture_upload(
file_extension (str): The file extension of binary data, *including the period*, for example ``.jpg``, ``.png``, ``.pcd``.
The backend routes the binary to its corresponding mime type based on this extension. Files with a ``.jpeg``, ``.jpg``,
or ``.png`` extension will appear in the **Images** tab.
mime_type (Optional[str]): Optional MIME type of the binary data. If provided, this will be used directly. If not provided, the MIME type will be inferred from `file_extension`.
method_parameters (Optional[Mapping[str, Any]]): Optional dictionary of method parameters. No longer in active use.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based data filtering when retrieving data.
dataset_ids (Optional[List[str]]): Optional list of datasets to add the data to.
Expand Down Expand Up @@ -1603,6 +1605,8 @@ async def binary_data_capture_upload(
method_parameters=method_parameters,
tags=tags,
dataset_ids=dataset_ids,
file_extension=file_extension,
mime_type=mime_type,
)
if file_extension:
metadata.file_extension = file_extension if file_extension[0] == "." else f".{file_extension}"
Expand Down Expand Up @@ -1714,6 +1718,7 @@ async def streaming_data_capture_upload(
data: bytes,
part_id: str,
file_ext: str,
mime_type: Optional[str] = None,
component_type: Optional[str] = None,
component_name: Optional[str] = None,
method_name: Optional[str] = None,
Expand Down Expand Up @@ -1745,6 +1750,7 @@ async def streaming_data_capture_upload(
data (bytes): The data to be uploaded.
part_id (str): Part ID of the resource associated with the file.
file_ext (str): File extension type for the data. required for determining MIME type.
mime_type (Optional[str]): Optional MIME type of the binary data. If provided, this will be used directly. If not provided, the MIME type will be inferred from `file_ext`.
component_type (Optional[str]): Optional type of the component associated with the file (for example, "movement_sensor").
component_name (Optional[str]): Optional name of the component associated with the file.
method_name (Optional[str]): Optional name of the method associated with the file.
Expand Down Expand Up @@ -1773,6 +1779,7 @@ async def streaming_data_capture_upload(
file_extension=file_ext if file_ext[0] == "." else f".{file_ext}",
tags=tags,
dataset_ids=dataset_ids,
mime_type=mime_type,
)
sensor_metadata = SensorMetadata(
time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times else None,
Expand Down Expand Up @@ -1800,6 +1807,7 @@ async def file_upload(
file_name: Optional[str] = None,
method_parameters: Optional[Mapping[str, Any]] = None,
file_extension: Optional[str] = None,
mime_type: Optional[str] = None,
tags: Optional[List[str]] = None,
dataset_ids: Optional[List[str]] = None,
) -> str:
Expand Down Expand Up @@ -1830,6 +1838,7 @@ async def file_upload(
method_parameters (Optional[str]): Optional dictionary of the method parameters. No longer in active use.
file_extension (Optional[str]): Optional file extension. The empty string ``""`` will be assigned as the file extension if one
isn't provided. Files with a ``.jpeg``, ``.jpg``, or ``.png`` extension will be saved to the **Images** tab.
mime_type (Optional[str]): Optional MIME type of the file. If provided, this will be used directly. If not provided, the MIME type will be inferred from `file_extension`.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based filtering when retrieving data.
dataset_ids (Optional[List[str]]): Optional list of datasets to add the data to.

Expand All @@ -1850,6 +1859,7 @@ async def file_upload(
file_name=file_name if file_name else "",
method_parameters=method_parameters,
file_extension=file_extension if file_extension else "",
mime_type=mime_type,
tags=tags,
dataset_ids=dataset_ids,
)
Expand All @@ -1864,6 +1874,7 @@ async def file_upload_from_path(
component_name: Optional[str] = None,
method_name: Optional[str] = None,
method_parameters: Optional[Mapping[str, Any]] = None,
mime_type: Optional[str] = None,
tags: Optional[List[str]] = None,
dataset_ids: Optional[List[str]] = None,
) -> str:
Expand All @@ -1888,6 +1899,7 @@ async def file_upload_from_path(
component_name (Optional[str]): Optional name of the component associated with the file.
method_name (Optional[str]): Optional name of the method associated with the file.
method_parameters (Optional[str]): Optional dictionary of the method parameters. No longer in active use.
mime_type (Optional[str]): Optional MIME type of the file. If provided, this will be used directly. If not provided, the MIME type will be inferred from the file's extension.
tags (Optional[List[str]]): Optional list of tags to allow for tag-based filtering when retrieving data.
dataset_ids (Optional[List[str]]): Optional list of datasets to add the data to.

Expand Down Expand Up @@ -1915,6 +1927,7 @@ async def file_upload_from_path(
file_name=file_name,
method_parameters=method_parameters,
file_extension=file_extension if file_extension else "",
mime_type=mime_type,
tags=tags,
dataset_ids=dataset_ids,
)
Expand Down
77 changes: 1 addition & 76 deletions src/viam/components/audio_in/client.py
Original file line number Diff line number Diff line change
@@ -1,76 +1 @@
import uuid
from typing import Any, Dict, List, Mapping, Optional

from grpclib.client import Channel
from grpclib.client import Stream as ClientStream

from viam.proto.common import DoCommandRequest, DoCommandResponse, Geometry, GetPropertiesRequest
from viam.proto.component.audioin import AudioInServiceStub, GetAudioRequest, GetAudioResponse
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.streams import StreamWithIterator
from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict

from .audio_in import AudioIn


class AudioInClient(AudioIn, ReconfigurableResourceRPCClientBase):
def __init__(self, name: str, channel: Channel) -> None:
self.channel = channel
self.client = AudioInServiceStub(channel)
super().__init__(name)

async def get_audio(
self,
codec: str,
duration_seconds: float,
previous_timestamp_ns: int,
*,
extra: Optional[Dict[str, Any]] = None,
**kwargs,
):
request = GetAudioRequest(
name=self.name,
codec=codec,
duration_seconds=duration_seconds,
previous_timestamp_nanoseconds=previous_timestamp_ns,
request_id=str(uuid.uuid4()),
extra=dict_to_struct(extra),
)

async def read():
md = kwargs.get("metadata", self.Metadata()).proto
audio_stream: ClientStream[GetAudioRequest, GetAudioResponse]
async with self.client.GetAudio.open(metadata=md) as audio_stream:
try:
await audio_stream.send_message(request, end=True)
async for response in audio_stream:
yield response
except Exception as e:
raise (e)

return StreamWithIterator(read())

async def get_properties(
self,
*,
timeout: Optional[float] = None,
**kwargs,
) -> AudioIn.Properties:
md = kwargs.get("metadata", self.Metadata()).proto
return await self.client.GetProperties(GetPropertiesRequest(name=self.name), timeout=timeout, metadata=md)

async def do_command(
self,
command: Mapping[str, ValueTypes],
*,
timeout: Optional[float] = None,
**kwargs,
) -> Mapping[str, ValueTypes]:
md = kwargs.get("metadata", self.Metadata()).proto
request = DoCommandRequest(name=self.name, command=dict_to_struct(command))
response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md)
return struct_to_dict(response.result)

async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]:
md = kwargs.get("metadata", self.Metadata())
return await get_geometries(self.client, self.name, extra, timeout, md)
The user wants me to delete the file `src/viam/components/audio_in/client.py`. Therefore, I should output nothing.
84 changes: 1 addition & 83 deletions src/viam/components/audio_in/service.py
Original file line number Diff line number Diff line change
@@ -1,83 +1 @@
from grpclib.server import Stream
from h2.exceptions import StreamClosedError

from viam.logging import getLogger
from viam.proto.common import (
DoCommandRequest,
DoCommandResponse,
GetGeometriesRequest,
GetGeometriesResponse,
GetPropertiesRequest,
GetPropertiesResponse,
)
from viam.proto.component.audioin import AudioInServiceBase, GetAudioRequest, GetAudioResponse
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict

from .audio_in import AudioIn

LOGGER = getLogger(__name__)


class AudioInRPCService(AudioInServiceBase, ResourceRPCServiceBase[AudioIn]):
"""
gRPC Service for a generic audio in.
"""

RESOURCE_TYPE = AudioIn

async def GetAudio(self, stream: Stream[GetAudioRequest, GetAudioResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
audio_in = self.get_resource(name)
audio_stream = await audio_in.get_audio(
codec=request.codec,
duration_seconds=request.duration_seconds,
previous_timestamp_ns=request.previous_timestamp_nanoseconds,
metadata=stream.metadata,
)
async for response in audio_stream:
try:
response.request_id = request.request_id
await stream.send_message(response)
except StreamClosedError:
return
except Exception as e:
LOGGER.error(e)
return

async def GetProperties(self, stream: Stream[GetPropertiesRequest, GetPropertiesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
audio_in = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
properties = await audio_in.get_properties(
timeout=timeout,
metadata=stream.metadata,
)
await stream.send_message(properties)

async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
audio_in = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await audio_in.do_command(
command=struct_to_dict(request.command),
timeout=timeout,
metadata=stream.metadata,
)
response = DoCommandResponse(result=dict_to_struct(result))
await stream.send_message(response)

async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometriesResponse]) -> None:
request = await stream.recv_message()
assert request is not None
arm = self.get_resource(request.name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
geometries = await arm.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout)
response = GetGeometriesResponse(geometries=geometries)
await stream.send_message(response)
The user wants me to delete the file `src/viam/components/audio_in/service.py`. Therefore, I should output nothing.
62 changes: 1 addition & 61 deletions tests/mocks/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from google.protobuf.timestamp_pb2 import Timestamp

from viam.components.arm import Arm, JointPositions, KinematicsFileFormat
from viam.components.audio_in import AudioIn, AudioResponse
from viam.components.audio_out import AudioOut
from viam.components.base import Base
from viam.components.board import Board, Tick
Expand All @@ -33,8 +32,7 @@
from viam.components.switch import Switch
from viam.errors import ResourceNotFoundError
from viam.media.video import CameraMimeType, NamedImage, ViamImage
from viam.proto.common import AudioInfo, Capsule, Geometry, GeoPoint, Orientation, Pose, PoseInFrame, ResponseMetadata, Sphere, Vector3
from viam.proto.component.audioin import AudioChunk as Chunk
from viam.proto.common import Capsule, Geometry, GeoPoint, Orientation, Pose, PoseInFrame, ResponseMetadata, Sphere, Vector3
from viam.proto.component.board import PowerMode
from viam.proto.component.encoder import PositionType
from viam.streams import StreamWithIterator
Expand Down Expand Up @@ -114,64 +112,6 @@ async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Option
return {"command": command}


class MockAudioIn(AudioIn):
def __init__(self, name: str, properties: AudioIn.Properties):
super().__init__(name)
self.geometries = GEOMETRIES
self.properties = properties
self.timeout: Optional[float] = None
self.extra: Optional[Dict[str, Any]] = None

async def get_audio(
self,
codec: str,
duration_seconds: float,
previous_timestamp_ns: int,
*,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
):
async def read() -> AsyncIterator[AudioResponse]:
# Generate mock audio chunks
for i in range(2):
chunk_data = f"audio_chunk_{i}".encode("utf-8")
timestamp_start = previous_timestamp_ns + i * 1000000000 # 1 second intervals in nanoseconds
timestamp_end = timestamp_start + 1000000000

audio_chunk = Chunk(
audio_data=chunk_data,
audio_info=AudioInfo(
codec=codec, sample_rate_hz=self.properties.sample_rate_hz, num_channels=self.properties.num_channels
),
sequence=i,
start_timestamp_nanoseconds=timestamp_start,
end_timestamp_nanoseconds=timestamp_end,
)

audio_response = AudioResponse(audio=audio_chunk, request_id="mock_request")
yield audio_response

self.extra = extra
self.timeout = timeout
return StreamWithIterator(read())

async def get_properties(
self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs
) -> AudioIn.Properties:
self.extra = extra
self.timeout = timeout
return self.properties

async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None) -> List[Geometry]:
self.extra = extra
self.timeout = timeout
return self.geometries

async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]:
return {"command": command}


class MockAudioOut(AudioOut):
def __init__(self, name: str, properties: AudioOut.Properties):
super().__init__(name)
Expand Down
Loading
Loading