Skip to content

Commit 0647489

Browse files
committed
Extract shared code, add additional conditionals
1 parent 07d4876 commit 0647489

File tree

1 file changed

+43
-39
lines changed

1 file changed

+43
-39
lines changed

durabletask/worker.py

Lines changed: 43 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from threading import Event, Thread
1313
from types import GeneratorType
1414
from enum import Enum
15-
from typing import Any, Generator, Optional, Sequence, TypeVar, Union
15+
from typing import Any, Generator, Optional, Sequence, Tuple, TypeVar, Union
1616
from packaging.version import InvalidVersion, parse
1717

1818
import grpc
@@ -1592,41 +1592,11 @@ def process_event(
15921592
raise TypeError("Unexpected sub-orchestration task type")
15931593
elif event.HasField("eventRaised"):
15941594
if event.eventRaised.name in ctx._entity_task_id_map:
1595-
# This eventRaised represents the result of an entity operation after being translated to the old
1596-
# entity protocol by the Durable WebJobs extension
15971595
entity_id, task_id = ctx._entity_task_id_map.get(event.eventRaised.name, (None, None))
1598-
if entity_id is None:
1599-
raise RuntimeError(f"Could not retrieve entity ID for entity-related eventRaised with ID '{event.eventId}'")
1600-
if task_id is None:
1601-
raise RuntimeError(f"Could not retrieve task ID for entity-related eventRaised with ID '{event.eventId}'")
1602-
entity_task = ctx._pending_tasks.pop(task_id, None)
1603-
if not entity_task:
1604-
raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'")
1605-
result = None
1606-
if not ph.is_empty(event.eventRaised.input):
1607-
# TODO: Investigate why the event result is wrapped in a dict with "result" key
1608-
result = shared.from_json(event.eventRaised.input.value)["result"]
1609-
ctx._entity_context.recover_lock_after_call(entity_id)
1610-
entity_task.complete(result)
1611-
ctx.resume()
1596+
self._handle_entity_event_raised(ctx, event, entity_id, task_id, False)
16121597
elif event.eventRaised.name in ctx._entity_lock_task_id_map:
1613-
# This eventRaised represents the result of an entity operation after being translated to the old
1614-
# entity protocol by the Durable WebJobs extension
16151598
entity_id, task_id = ctx._entity_lock_task_id_map.get(event.eventRaised.name, (None, None))
1616-
if entity_id is None:
1617-
raise RuntimeError(f"Could not retrieve entity ID for entity-related eventRaised with ID '{event.eventId}'")
1618-
if task_id is None:
1619-
raise RuntimeError(f"Could not retrieve task ID for entity-related eventRaised with ID '{event.eventId}'")
1620-
entity_task = ctx._pending_tasks.pop(task_id, None)
1621-
if not entity_task:
1622-
raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'")
1623-
result = None
1624-
if not ph.is_empty(event.eventRaised.input):
1625-
# TODO: Investigate why the event result is wrapped in a dict with "result" key
1626-
result = shared.from_json(event.eventRaised.input.value)["result"]
1627-
ctx._entity_context.complete_acquire(event.eventRaised.name)
1628-
entity_task.complete(EntityLock(ctx))
1629-
ctx.resume()
1599+
self._handle_entity_event_raised(ctx, event, entity_id, task_id, True)
16301600
else:
16311601
# event names are case-insensitive
16321602
event_name = event.eventRaised.name.casefold()
@@ -1791,15 +1761,11 @@ def process_event(
17911761
action = ctx._pending_actions.pop(event.eventId, None)
17921762
if action and action.HasField("sendEntityMessage"):
17931763
if action.sendEntityMessage.HasField("entityOperationCalled"):
1794-
entity_id = EntityInstanceId.parse(event.eventSent.instanceId)
1795-
event_id = json.loads(event.eventSent.input.value)["id"]
1764+
entity_id, event_id = self._parse_entity_event_sent_input(event)
17961765
ctx._entity_task_id_map[event_id] = (entity_id, event.eventId)
17971766
elif action.sendEntityMessage.HasField("entityLockRequested"):
1798-
entity_id = EntityInstanceId.parse(event.eventSent.instanceId)
1799-
event_id = json.loads(event.eventSent.input.value)["id"]
1767+
entity_id, event_id = self._parse_entity_event_sent_input(event)
18001768
ctx._entity_lock_task_id_map[event_id] = (entity_id, event.eventId)
1801-
else:
1802-
return
18031769
else:
18041770
eventType = event.WhichOneof("eventType")
18051771
raise task.OrchestrationStateError(
@@ -1809,6 +1775,44 @@ def process_event(
18091775
# The orchestrator generator function completed
18101776
ctx.set_complete(generatorStopped.value, pb.ORCHESTRATION_STATUS_COMPLETED)
18111777

1778+
def _parse_entity_event_sent_input(self, event: pb.HistoryEvent) -> Tuple[EntityInstanceId, str]:
1779+
try:
1780+
entity_id = EntityInstanceId.parse(event.eventSent.instanceId)
1781+
except ValueError:
1782+
raise RuntimeError(f"Could not parse entity ID from instanceId '{event.eventSent.instanceId}'")
1783+
try:
1784+
event_id = json.loads(event.eventSent.input.value)["id"]
1785+
except (json.JSONDecodeError, KeyError, TypeError) as ex:
1786+
raise RuntimeError(f"Could not parse event ID from eventSent input '{event.eventSent.input.value}'") from ex
1787+
return entity_id, event_id
1788+
1789+
def _handle_entity_event_raised(self,
1790+
ctx: _RuntimeOrchestrationContext,
1791+
event: pb.HistoryEvent,
1792+
entity_id: Optional[EntityInstanceId],
1793+
task_id: Optional[int],
1794+
is_lock_event: bool):
1795+
# This eventRaised represents the result of an entity operation after being translated to the old
1796+
# entity protocol by the Durable WebJobs extension
1797+
if entity_id is None:
1798+
raise RuntimeError(f"Could not retrieve entity ID for entity-related eventRaised with ID '{event.eventId}'")
1799+
if task_id is None:
1800+
raise RuntimeError(f"Could not retrieve task ID for entity-related eventRaised with ID '{event.eventId}'")
1801+
entity_task = ctx._pending_tasks.pop(task_id, None)
1802+
if not entity_task:
1803+
raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'")
1804+
result = None
1805+
if not ph.is_empty(event.eventRaised.input):
1806+
# TODO: Investigate why the event result is wrapped in a dict with "result" key
1807+
result = shared.from_json(event.eventRaised.input.value)["result"]
1808+
if is_lock_event:
1809+
ctx._entity_context.complete_acquire(event.eventRaised.name)
1810+
entity_task.complete(EntityLock(ctx))
1811+
else:
1812+
ctx._entity_context.recover_lock_after_call(entity_id)
1813+
entity_task.complete(result)
1814+
ctx.resume()
1815+
18121816
def evaluate_orchestration_versioning(self, versioning: Optional[VersioningOptions], orchestration_version: Optional[str]) -> Optional[pb.TaskFailureDetails]:
18131817
if versioning is None:
18141818
return None

0 commit comments

Comments
 (0)