Skip to content

Commit 20524b2

Browse files
authored
Expose call headers (#98)
1 parent dbe3463 commit 20524b2

File tree

3 files changed

+31
-15
lines changed

3 files changed

+31
-15
lines changed

examples/example.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
if __name__ == "__main__":
3030
import hypercorn
31+
import hypercorn.asyncio
3132
import asyncio
3233
conf = hypercorn.Config()
3334
conf.bind = ["0.0.0.0:9080"]

python/restate/context.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,9 @@ def sleep(self, delta: timedelta) -> RestateDurableSleepFuture:
292292
def service_call(self,
293293
tpe: Callable[[Any, I], Awaitable[O]],
294294
arg: I,
295-
idempotency_key: str | None = None) -> RestateDurableCallFuture[O]:
295+
idempotency_key: str | None = None,
296+
headers: typing.Dict[str, str] | None = None
297+
) -> RestateDurableCallFuture[O]:
296298
"""
297299
Invokes the given service with the given argument.
298300
"""
@@ -304,6 +306,7 @@ def service_send(self,
304306
arg: I,
305307
send_delay: Optional[timedelta] = None,
306308
idempotency_key: str | None = None,
309+
headers: typing.Dict[str, str] | None = None
307310
) -> SendHandle:
308311
"""
309312
Invokes the given service with the given argument.
@@ -315,6 +318,7 @@ def object_call(self,
315318
key: str,
316319
arg: I,
317320
idempotency_key: str | None = None,
321+
headers: typing.Dict[str, str] | None = None
318322
) -> RestateDurableCallFuture[O]:
319323
"""
320324
Invokes the given object with the given argument.
@@ -327,6 +331,7 @@ def object_send(self,
327331
arg: I,
328332
send_delay: Optional[timedelta] = None,
329333
idempotency_key: str | None = None,
334+
headers: typing.Dict[str, str] | None = None
330335
) -> SendHandle:
331336
"""
332337
Send a message to an object with the given argument.
@@ -338,6 +343,7 @@ def workflow_call(self,
338343
key: str,
339344
arg: I,
340345
idempotency_key: str | None = None,
346+
headers: typing.Dict[str, str] | None = None
341347
) -> RestateDurableCallFuture[O]:
342348
"""
343349
Invokes the given workflow with the given argument.
@@ -350,6 +356,7 @@ def workflow_send(self,
350356
arg: I,
351357
send_delay: Optional[timedelta] = None,
352358
idempotency_key: str | None = None,
359+
headers: typing.Dict[str, str] | None = None
353360
) -> SendHandle:
354361
"""
355362
Send a message to an object with the given argument.
@@ -362,7 +369,9 @@ def generic_call(self,
362369
handler: str,
363370
arg: bytes,
364371
key: Optional[str] = None,
365-
idempotency_key: str | None = None) -> RestateDurableCallFuture[bytes]:
372+
idempotency_key: str | None = None,
373+
headers: typing.Dict[str, str] | None = None
374+
) -> RestateDurableCallFuture[bytes]:
366375
"""
367376
Invokes the given generic service/handler with the given argument.
368377
"""
@@ -375,6 +384,7 @@ def generic_send(self,
375384
key: Optional[str] = None,
376385
send_delay: Optional[timedelta] = None,
377386
idempotency_key: str | None = None,
387+
headers: typing.Dict[str, str] | None = None
378388
) -> SendHandle:
379389
"""
380390
Send a message to a generic service/handler with the given argument.

python/restate/server_context.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ def do_call(self,
549549
send_delay: Optional[timedelta] = None,
550550
send: bool = False,
551551
idempotency_key: str | None = None,
552-
headers: typing.List[typing.Tuple[str, str]] | None = None
552+
headers: typing.Dict[str,str] | None = None
553553
) -> RestateDurableCallFuture[O] | SendHandle:
554554
"""Make an RPC call to the given handler"""
555555
target_handler = handler_from_callable(tpe)
@@ -570,24 +570,28 @@ def do_raw_call(self,
570570
send_delay: Optional[timedelta] = None,
571571
send: bool = False,
572572
idempotency_key: str | None = None,
573-
headers: typing.List[typing.Tuple[str, str]] | None = None
573+
headers: typing.Dict[str, str] | None = None
574574
) -> RestateDurableCallFuture[O] | SendHandle:
575575
"""Make an RPC call to the given handler"""
576576
parameter = input_serde.serialize(input_param)
577+
if headers is not None:
578+
headers_kvs = list(headers.items())
579+
else:
580+
headers_kvs = []
577581
if send_delay:
578582
ms = int(send_delay.total_seconds() * 1000)
579-
send_handle = self.vm.sys_send(service, handler, parameter, key, delay=ms, idempotency_key=idempotency_key, headers=headers)
583+
send_handle = self.vm.sys_send(service, handler, parameter, key, delay=ms, idempotency_key=idempotency_key, headers=headers_kvs)
580584
return ServerSendHandle(self, send_handle)
581585
if send:
582-
send_handle = self.vm.sys_send(service, handler, parameter, key, idempotency_key=idempotency_key, headers=headers)
586+
send_handle = self.vm.sys_send(service, handler, parameter, key, idempotency_key=idempotency_key, headers=headers_kvs)
583587
return ServerSendHandle(self, send_handle)
584588

585589
handle = self.vm.sys_call(service=service,
586590
handler=handler,
587591
parameter=parameter,
588592
key=key,
589593
idempotency_key=idempotency_key,
590-
headers=headers)
594+
headers=headers_kvs)
591595

592596
return self.create_call_future(handle=handle.result_handle,
593597
invocation_id_handle=handle.invocation_id_handle,
@@ -597,13 +601,14 @@ def service_call(self,
597601
tpe: Callable[[Any, I], Awaitable[O]],
598602
arg: I,
599603
idempotency_key: str | None = None,
600-
headers: typing.List[typing.Tuple[str, str]] | None = None
604+
headers: typing.Dict[str, str] | None = None
601605
) -> RestateDurableCallFuture[O]:
602606
coro = self.do_call(tpe, arg, idempotency_key=idempotency_key, headers=headers)
603607
assert not isinstance(coro, SendHandle)
604608
return coro
605609

606-
def service_send(self, tpe: Callable[[Any, I], Awaitable[O]], arg: I, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.List[typing.Tuple[str, str]] | None = None) -> SendHandle:
610+
611+
def service_send(self, tpe: Callable[[Any, I], Awaitable[O]], arg: I, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.Dict[str, str] | None = None) -> SendHandle:
607612
send = self.do_call(tpe=tpe, parameter=arg, send_delay=send_delay, send=True, idempotency_key=idempotency_key, headers=headers)
608613
assert isinstance(send, SendHandle)
609614
return send
@@ -613,13 +618,13 @@ def object_call(self,
613618
key: str,
614619
arg: I,
615620
idempotency_key: str | None = None,
616-
headers: typing.List[typing.Tuple[str, str]] | None = None
621+
headers: typing.Dict[str, str] | None = None
617622
) -> RestateDurableCallFuture[O]:
618623
coro = self.do_call(tpe, arg, key, idempotency_key=idempotency_key, headers=headers)
619624
assert not isinstance(coro, SendHandle)
620625
return coro
621626

622-
def object_send(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, arg: I, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.List[typing.Tuple[str, str]] | None = None) -> SendHandle:
627+
def object_send(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, arg: I, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.Dict[str, str] | None = None) -> SendHandle:
623628
send = self.do_call(tpe=tpe, key=key, parameter=arg, send_delay=send_delay, send=True, idempotency_key=idempotency_key, headers=headers)
624629
assert isinstance(send, SendHandle)
625630
return send
@@ -629,16 +634,16 @@ def workflow_call(self,
629634
key: str,
630635
arg: I,
631636
idempotency_key: str | None = None,
632-
headers: typing.List[typing.Tuple[str, str]] | None = None
637+
headers: typing.Dict[str, str] | None = None
633638
) -> RestateDurableCallFuture[O]:
634639
return self.object_call(tpe, key, arg, idempotency_key=idempotency_key, headers=headers)
635640

636-
def workflow_send(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, arg: I, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.List[typing.Tuple[str, str]] | None = None) -> SendHandle:
641+
def workflow_send(self, tpe: Callable[[Any, I], Awaitable[O]], key: str, arg: I, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.Dict[str, str] | None = None) -> SendHandle:
637642
send = self.object_send(tpe, key, arg, send_delay, idempotency_key=idempotency_key, headers=headers)
638643
assert isinstance(send, SendHandle)
639644
return send
640645

641-
def generic_call(self, service: str, handler: str, arg: bytes, key: str | None = None, idempotency_key: str | None = None, headers: typing.List[typing.Tuple[str, str]] | None = None) -> RestateDurableCallFuture[bytes]:
646+
def generic_call(self, service: str, handler: str, arg: bytes, key: str | None = None, idempotency_key: str | None = None, headers: typing.Dict[str, str] | None = None) -> RestateDurableCallFuture[bytes]:
642647
serde = BytesSerde()
643648
call_handle = self.do_raw_call(service=service,
644649
handler=handler,
@@ -651,7 +656,7 @@ def generic_call(self, service: str, handler: str, arg: bytes, key: str | None =
651656
assert not isinstance(call_handle, SendHandle)
652657
return call_handle
653658

654-
def generic_send(self, service: str, handler: str, arg: bytes, key: str | None = None, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.List[typing.Tuple[str, str]] | None = None) -> SendHandle:
659+
def generic_send(self, service: str, handler: str, arg: bytes, key: str | None = None, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.Dict[str, str] | None = None) -> SendHandle:
655660
serde = BytesSerde()
656661
send_handle = self.do_raw_call(service=service,
657662
handler=handler,

0 commit comments

Comments
 (0)