diff --git a/communications/commands.py b/communications/commands.py index d20b3302..39ae8edf 100644 --- a/communications/commands.py +++ b/communications/commands.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from utils.constants import MAX_COMMAND_SIZE, MIN_COMMAND_SIZE, CommandEnum +from utils.constants import MAX_COMMAND_DATA_SIZE, CommandEnum from utils.exceptions import CommandException from typing import TYPE_CHECKING, Union, Dict, List, Any, Optional from communications.codec import Codec @@ -44,22 +44,14 @@ class Command(ABC): def __init__(self) -> None: self.uplink_buffer_size = sum([codec.num_bytes for codec in self.uplink_codecs]) - if self.uplink_buffer_size > MAX_COMMAND_SIZE - MIN_COMMAND_SIZE: - logging.error( - f"Buffer size too big: {self.uplink_buffer_size} > {MAX_COMMAND_SIZE - MIN_COMMAND_SIZE}" - ) - self.downlink_buffer_size = sum( - [codec.num_bytes for codec in self.downlink_codecs] - ) + if self.uplink_buffer_size > MAX_COMMAND_DATA_SIZE: + logging.error(f"Buffer size too big: {self.uplink_buffer_size} > {MAX_COMMAND_DATA_SIZE}") + self.downlink_buffer_size = sum([codec.num_bytes for codec in self.downlink_codecs]) self.uplink_codecs_dict = {codec.name: codec for codec in self.uplink_codecs} - self.downlink_codecs_dict = { - codec.name: codec for codec in self.downlink_codecs - } + self.downlink_codecs_dict = {codec.name: codec for codec in self.downlink_codecs} @abstractmethod - def _method( - self, main: Optional[MainSatelliteThread] = None, **kwargs - ) -> Optional[Dict[str, Union[float, int]]]: + def _method(self, main: Optional[MainSatelliteThread] = None, **kwargs) -> Optional[Dict[str, Union[float, int]]]: ... @staticmethod @@ -74,9 +66,7 @@ def _unpack(data: bytes, codec_dict: Dict[str, Codec]) -> Dict[str, Any]: return kwargs @staticmethod - def _pack( - kwargs: Dict[str, Any], codecs_dict: Dict[str, Codec], buffer_size: int - ) -> bytes: + def _pack(kwargs: Dict[str, Any], codecs_dict: Dict[str, Codec], buffer_size: int) -> bytes: buffer = bytearray(buffer_size) offset = 0 @@ -97,9 +87,7 @@ def unpack_telem(self, arg_data: bytes) -> Dict[str, Any]: return self._unpack(arg_data, self.downlink_codecs_dict) def pack_telem(self, telem_dict) -> bytes: - return self._pack( - telem_dict, self.downlink_codecs_dict, self.downlink_buffer_size - ) + return self._pack(telem_dict, self.downlink_codecs_dict, self.downlink_buffer_size) def packing_check(self, telem: Optional[Dict], codec_list: List[Codec]): error = False diff --git a/utils/constants.py b/utils/constants.py index 30c362bf..7940b69c 100644 --- a/utils/constants.py +++ b/utils/constants.py @@ -24,12 +24,15 @@ # SQL Stuff DB_ENTRY_LIMIT = 1000 # TODO update # maximum number of entries in any of the databases +# TODO: change to parameter + DB_FILE = "sqlite:///" + os.path.join(CISLUNAR_BASE_DIR, "satellite-db.sqlite") LOG_DIR = os.path.join(CISLUNAR_BASE_DIR, "logs") NEMO_DIR = os.path.join(CISLUNAR_BASE_DIR, "nemo") # Delay to wait on BootUp # TODO change back to 30.0 +# TODO: change to parameter BOOTUP_SEPARATION_DELAY = 5.0 # seconds # Verification Key Parameters @@ -47,7 +50,10 @@ DATA_LEN_SIZE = 1 MIN_COMMAND_SIZE = MAC_LENGTH + COUNTER_SIZE + ID_SIZE + DATA_LEN_SIZE -MAX_COMMAND_SIZE = 200 # Maximum command size based on the radio board's buffer size. +AX5043_BUFFER_SIZE = 256 + +MAX_COMMAND_DATA_SIZE = AX5043_BUFFER_SIZE - MIN_COMMAND_SIZE +# Maximum command size based on the radio board's buffer size. # TODO: figure out what this number _actually_ is # Serializations Offsets