11from __future__ import annotations
22from collections .abc import Callable
3+ from types import MappingProxyType
34import logging
45
5- from .btp_header import BTPAHeader , BTPBHeader
6+ from .btp_header import BTPBHeader
67from .service_access_point import BTPDataIndication , BTPDataRequest
78from ..geonet .common_header import CommonNH
89from ..geonet .service_access_point import GNDataIndication , GNDataRequest
@@ -17,20 +18,29 @@ class Router:
1718
1819 Attributes
1920 ----------
20- indication_callbacks : Dict [int, Callable[[BTPDataIndication], None]]
21- Dictionary of indication callbacks. The key is the port and the value is the callback.
21+ indication_callbacks : MappingProxyType [int, Callable[[BTPDataIndication], None]] | None
22+ MappingProxyType of indication callbacks. The key is the port and the value is the callback.
2223 gn_router : GNRouter
2324 Geonetworking Router.
2425 """
2526
2627 def __init__ (self , gn_router : GNRouter ) -> None :
2728 self .logging = logging .getLogger ("btp" )
28-
29- self .indication_callbacks : dict [int , Callable [[BTPDataIndication ], None ]] = {}
29+ self .pre_indication_callbacks : dict [int ,
30+ Callable [[BTPDataIndication ], None ]] = {}
31+ self .indication_callbacks : MappingProxyType [int , Callable [[
32+ BTPDataIndication ], None ]] | None = None
3033 self .gn_router = gn_router
3134
3235 self .logging .info ("BTP Router Initialized!" )
3336
37+ def freeze_callbacks (self ):
38+ """
39+ Freezes the indication callbacks. After calling this method, no more callbacks can be registered.
40+ """
41+ self .indication_callbacks = MappingProxyType (
42+ dict (self .pre_indication_callbacks ))
43+
3444 def register_indication_callback_btp (
3545 self , port : int , callback : Callable [[BTPDataIndication ], None ]
3646 ) -> None :
@@ -44,7 +54,9 @@ def register_indication_callback_btp(
4454 callback : Callable[[BTPDataIndication], None]
4555 Callback to register.
4656 """
47- self .indication_callbacks [port ] = callback
57+ if self .indication_callbacks is not None :
58+ raise RuntimeError ("Cannot register callbacks after freezing" )
59+ self .pre_indication_callbacks [port ] = callback
4860 self .logging .info ("Indication callback registered" )
4961
5062 def btp_data_request (self , request : BTPDataRequest ) -> None :
@@ -57,19 +69,22 @@ def btp_data_request(self, request: BTPDataRequest) -> None:
5769 BTPDataRequest to handle.
5870 """
5971 if request .btp_type == CommonNH .BTP_B :
60- header = BTPBHeader ()
61- header .destination_port_info = request .destinaion_port_info
62- header .destination_port = request .destination_port
63- gn_data_request = GNDataRequest ()
64- gn_data_request .upper_protocol_entity = request .btp_type
65- gn_data_request .packet_transport_type = request .gn_packet_transport_type
66- gn_data_request .area = request .gn_area
67- gn_data_request .communication_profile = request .communication_profile
68- gn_data_request .traffic_class = request .traffic_class
69- gn_data_request .data = header .encode () + request .data
70- gn_data_request .length = len (gn_data_request .data )
71-
72- self .logging .debug ("Sending BTP Data Request: %s" , gn_data_request .data )
72+ header = BTPBHeader (
73+ destination_port = request .destination_port ,
74+ destination_port_info = request .destination_port_info ,
75+ )
76+ data = header .encode () + request .data
77+ gn_data_request = GNDataRequest (
78+ upper_protocol_entity = request .btp_type ,
79+ packet_transport_type = request .gn_packet_transport_type ,
80+ area = request .gn_area ,
81+ communication_profile = request .communication_profile ,
82+ traffic_class = request .traffic_class ,
83+ data = data ,
84+ length = len (data )
85+ )
86+ self .logging .debug (
87+ "Sending BTP Data Request: %s" , gn_data_request .data )
7388 self .gn_router .gn_data_request (gn_data_request )
7489 elif request .btp_type == CommonNH .BTP_A :
7590 raise NotImplementedError ("BTPADataRequest not implemented" )
@@ -85,16 +100,20 @@ def btp_b_data_indication(self, gn_data_indication: GNDataIndication) -> None:
85100 gn_data_indication : GNDataIndication
86101 GNDataIndication to handle.
87102 """
88- indication = BTPDataIndication ()
89- indication .initialize_with_gn_data_indication (gn_data_indication )
90- header = BTPBHeader ()
91- header .decode (gn_data_indication .data )
92- indication .destination_port = header .destination_port
93- indication .destinaion_port_info = header .destination_port_info
94- for port , callback in self .indication_callbacks .items ():
95- if port == indication .destination_port :
96- self .logging .debug ("Sending BTP B Data Indication: %s" , indication .data )
103+ indication = BTPDataIndication .initialize_with_gn_data_indication (
104+ gn_data_indication )
105+ header = BTPBHeader .decode (gn_data_indication .data )
106+ indication = indication .set_destination_port_and_info (
107+ destination_port = header .destination_port ,
108+ destination_port_info = header .destination_port_info
109+ )
110+ if self .indication_callbacks :
111+ callback = self .indication_callbacks .get (
112+ indication .destination_port )
113+ if callback :
97114 callback (indication )
115+ else :
116+ raise RuntimeError ("Indication callbacks not frozen" )
98117
99118 def btp_a_data_indication (self , gn_data_indication : GNDataIndication ) -> None :
100119 """
@@ -105,8 +124,6 @@ def btp_a_data_indication(self, gn_data_indication: GNDataIndication) -> None:
105124 gn_data_indication : GNDataIndication
106125 GNDataIndication to handle.
107126 """
108- header = BTPAHeader ()
109- header .decode (gn_data_indication .data )
110127 raise NotImplementedError ("BTPADataIndication not implemented" )
111128
112129 def btp_data_indication (self , gn_data_indication : GNDataIndication ) -> None :
0 commit comments