Skip to content
This repository was archived by the owner on Jun 13, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 8 additions & 20 deletions communications/commands.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from utils.constants import MAX_COMMAND_SIZE, MIN_COMMAND_SIZE, CommandEnum
from utils.constants import MAX_COMMAND_DATA_SIZE, CommandEnum
from utils.exceptions import CommandException
from typing import TYPE_CHECKING, Union, Dict, List, Any, Optional
from communications.codec import Codec
Expand Down Expand Up @@ -44,22 +44,14 @@ class Command(ABC):

def __init__(self) -> None:
self.uplink_buffer_size = sum([codec.num_bytes for codec in self.uplink_codecs])
if self.uplink_buffer_size > MAX_COMMAND_SIZE - MIN_COMMAND_SIZE:
logging.error(
f"Buffer size too big: {self.uplink_buffer_size} > {MAX_COMMAND_SIZE - MIN_COMMAND_SIZE}"
)
self.downlink_buffer_size = sum(
[codec.num_bytes for codec in self.downlink_codecs]
)
if self.uplink_buffer_size > MAX_COMMAND_DATA_SIZE:
logging.error(f"Buffer size too big: {self.uplink_buffer_size} > {MAX_COMMAND_DATA_SIZE}")
self.downlink_buffer_size = sum([codec.num_bytes for codec in self.downlink_codecs])
self.uplink_codecs_dict = {codec.name: codec for codec in self.uplink_codecs}
self.downlink_codecs_dict = {
codec.name: codec for codec in self.downlink_codecs
}
self.downlink_codecs_dict = {codec.name: codec for codec in self.downlink_codecs}

@abstractmethod
def _method(
self, main: Optional[MainSatelliteThread] = None, **kwargs
) -> Optional[Dict[str, Union[float, int]]]:
def _method(self, main: Optional[MainSatelliteThread] = None, **kwargs) -> Optional[Dict[str, Union[float, int]]]:
...

@staticmethod
Expand All @@ -74,9 +66,7 @@ def _unpack(data: bytes, codec_dict: Dict[str, Codec]) -> Dict[str, Any]:
return kwargs

@staticmethod
def _pack(
kwargs: Dict[str, Any], codecs_dict: Dict[str, Codec], buffer_size: int
) -> bytes:
def _pack(kwargs: Dict[str, Any], codecs_dict: Dict[str, Codec], buffer_size: int) -> bytes:
buffer = bytearray(buffer_size)
offset = 0

Expand All @@ -97,9 +87,7 @@ def unpack_telem(self, arg_data: bytes) -> Dict[str, Any]:
return self._unpack(arg_data, self.downlink_codecs_dict)

def pack_telem(self, telem_dict) -> bytes:
return self._pack(
telem_dict, self.downlink_codecs_dict, self.downlink_buffer_size
)
return self._pack(telem_dict, self.downlink_codecs_dict, self.downlink_buffer_size)

def packing_check(self, telem: Optional[Dict], codec_list: List[Codec]):
error = False
Expand Down
8 changes: 7 additions & 1 deletion utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@

# SQL Stuff
DB_ENTRY_LIMIT = 1000 # TODO update # maximum number of entries in any of the databases
# TODO: change to parameter

DB_FILE = "sqlite:///" + os.path.join(CISLUNAR_BASE_DIR, "satellite-db.sqlite")
LOG_DIR = os.path.join(CISLUNAR_BASE_DIR, "logs")
NEMO_DIR = os.path.join(CISLUNAR_BASE_DIR, "nemo")

# Delay to wait on BootUp
# TODO change back to 30.0
# TODO: change to parameter
BOOTUP_SEPARATION_DELAY = 5.0 # seconds

# Verification Key Parameters
Expand All @@ -47,7 +50,10 @@
DATA_LEN_SIZE = 1
MIN_COMMAND_SIZE = MAC_LENGTH + COUNTER_SIZE + ID_SIZE + DATA_LEN_SIZE

MAX_COMMAND_SIZE = 200 # Maximum command size based on the radio board's buffer size.
AX5043_BUFFER_SIZE = 256

MAX_COMMAND_DATA_SIZE = AX5043_BUFFER_SIZE - MIN_COMMAND_SIZE
# Maximum command size based on the radio board's buffer size.
# TODO: figure out what this number _actually_ is

# Serializations Offsets
Expand Down