Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
90fa8fd
Fix rendering of template fields with start from trigger
dabla Feb 2, 2026
5f7306d
refactor: Check if TaskInstance exists or not in BaseTrigger
dabla Feb 2, 2026
2fddf07
Revert "refactor: Check if TaskInstance exists or not in BaseTrigger"
dabla Feb 2, 2026
a21201f
refactor: Changed return type of task_instance property in BaseTrigger
dabla Feb 2, 2026
937a379
refactor: Make sure default values for start from trigger can be over…
dabla Feb 6, 2026
6819e63
refactor: Remove assert on start_date of TaskInstance
dabla Feb 6, 2026
f5dd331
refactor: Make sure to check if dag_data is not None in workloads bef…
dabla Feb 6, 2026
ed0594b
refactor: Only pass serialized dag model to workload if trigger conta…
dabla Feb 6, 2026
ec34481
refactor: Don't invoke _read_dag twice in get_dag method of DBDagBag …
dabla Feb 6, 2026
df75780
refactor: Don't invoke _read_dag twice in get_dag method of DBDagBag …
dabla Feb 6, 2026
23d7aea
refactor: Make _version_from_dag_run method of DBDagBag failsafe for …
dabla Feb 6, 2026
6684575
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 6, 2026
6c28707
refactor: Moved None check on start_state together with the task in o…
dabla Feb 6, 2026
96d1893
Revert "refactor: Make _version_from_dag_run method of DBDagBag fails…
dabla Feb 6, 2026
4ab221e
refactor: Fixed test_get_dag_model
dabla Feb 6, 2026
1da2ca1
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 6, 2026
0b4a36a
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 7, 2026
67bfa1e
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 7, 2026
bb80bfc
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 7, 2026
558c88a
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 9, 2026
d5d339f
refactor: Only pass serialized Dag model data to RunTrigger if start_…
dabla Feb 9, 2026
c7ce525
refactor: Added docstrings for start_from_trigger and start_trigger_args
dabla Feb 9, 2026
f631b37
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 9, 2026
499f463
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 10, 2026
259ae7b
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
a7e7c69
refactor: Templated field must be checked on task of task instance
dabla Feb 11, 2026
a019832
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
b18e942
refactor: Added start_from_trigger property on Trigger
dabla Feb 11, 2026
2dc4f4b
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
337b22f
refactor: Reformatted trigger unit test
dabla Feb 11, 2026
c2de568
refactor: Only the RuntimeTaskInstance has the task attribute, the ge…
dabla Feb 11, 2026
02e3413
refactor: Reformatted test trigger
dabla Feb 11, 2026
064e59e
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
9010f1c
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
a3a9964
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 12, 2026
af9553d
Update airflow-core/src/airflow/serialization/definitions/mappedopera…
dabla Feb 13, 2026
40b9b2d
refactor: Removed obsolete run method from TaskInstance
dabla Feb 13, 2026
5042c3c
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 13, 2026
3e92f47
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 23, 2026
f3ca48a
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 24, 2026
85ac14d
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
3041c11
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
56f8cec
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
cc9b2cc
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
f3ffb51
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
c2c98b7
refactor: Added dag_data field to RunTrigger and made ti field optional
dabla Mar 4, 2026
1a85868
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 4, 2026
76ee5a6
refactor: Reformatted RunTrigger
dabla Mar 4, 2026
2834afc
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
d19f690
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
399c94b
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
6790e65
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
5aace8f
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
5331992
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 12, 2026
39f0782
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 12, 2026
75dfc99
refactor: We cannot detect if a Trigger has a task associated with a …
dabla Mar 12, 2026
1436811
refactor: Re-added check on start_from_trigger from serialized Dag
dabla Mar 12, 2026
97b4d45
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 17, 2026
bac2d24
refactor: Fixed call to dag_bag in get_dag_for_run_or_latest_version …
dabla Mar 17, 2026
4796335
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 17, 2026
8e38dcf
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 17, 2026
19f975b
refactor: Extracted _do_render_template_fields method into Template s…
dabla Mar 19, 2026
869ba37
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 19, 2026
1d0c6fd
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 19, 2026
2684561
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 19, 2026
c2f8271
refactor: task_id should be an instance field instead of property
dabla Mar 20, 2026
8717575
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
f7fa0b0
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
cd23b0c
refactor: Added tests for _do_render_template_fields method in TestTe…
dabla Mar 20, 2026
66b94f5
refactor: Fixed templater unit tests
dabla Mar 20, 2026
f5e3289
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
2bfafa1
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
0f564af
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
e30f00e
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 24, 2026
e22575a
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 25, 2026
f2decd4
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 25, 2026
eff4fbd
refactor: Raise NotImplementError in _set_context
dabla Mar 25, 2026
129e390
refactor: Reverted logging back to structlog in mappedoperator
dabla Mar 25, 2026
8eb0042
refactor: Refactored _create_workload in trigger job runner
dabla Mar 25, 2026
7e5c062
refactor: Renamed get_dag_model to get_serialized_dag_model in DBDagBag
dabla Mar 25, 2026
20f7ac0
refactor: Refactored templater using structlog
dabla Mar 25, 2026
856c496
refactor: Added docstring to get_serialized_dag_model
dabla Mar 25, 2026
398265e
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 25, 2026
e2ebeeb
Revert "refactor: Raise NotImplementError in _set_context"
dabla Mar 25, 2026
169e5d9
refactor: Fixed typing of render_log_fname
dabla Mar 25, 2026
1b856ae
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 25, 2026
55ed71d
Revert "refactor: Refactored templater using structlog"
dabla Mar 25, 2026
68a3d70
refactor: Reformatted files
dabla Mar 25, 2026
937f808
refactor: Removed new line in get_serialized_dag_model
dabla Mar 25, 2026
0ad75bf
refactor: Fixed test_get_dag_returns_none_when_model_missing
dabla Mar 25, 2026
3d7c35e
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 25, 2026
fbaec4e
refactor: Removed default NEW_SESSION from session parameter in _crea…
dabla Mar 25, 2026
ade3a17
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ repos:
^src/airflow/timetables/assets\.py$|
^src/airflow/timetables/base\.py$|
^src/airflow/timetables/simple\.py$|
^src/airflow/triggers/base\.py$|
^src/airflow/utils/cli\.py$|
^src/airflow/utils/context\.py$|
^src/airflow/utils/dag_cycle_tester\.py$|
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/api_fastapi/common/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def get_dag_for_run_or_latest_version(
dag: SerializedDAG | None = None
if dag_run:
if dag_run.created_dag_version_id:
dag = dag_bag._get_dag(dag_run.created_dag_version_id, session=session)
dag = dag_bag.get_dag(dag_run.created_dag_version_id, session=session)
if not dag:
dag = dag_bag.get_dag_for_run(dag_run, session=session)
elif dag_id:
Expand Down
5 changes: 4 additions & 1 deletion airflow-core/src/airflow/executors/workloads/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ class RunTrigger(BaseModel):
"""

id: int
ti: TaskInstanceDTO | None # Could be none for asset-based triggers.
classpath: str # Dot-separated name of the module+fn to import and run this workload.
encrypted_kwargs: str
ti: TaskInstanceDTO | None = None # Could be none for asset-based triggers.
timeout_after: datetime | None = None
type: Literal["RunTrigger"] = Field(init=False, default="RunTrigger")
dag_data: dict | None = (
None # Serialized Dag model in dict format so it can be deserialized in trigger subprocess.
)
200 changes: 139 additions & 61 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import sys
import time
from collections import deque
from collections.abc import Generator, Iterable
from collections.abc import Callable, Generator, Iterable
from contextlib import suppress
from datetime import datetime
from socket import socket
Expand All @@ -51,6 +51,7 @@
from airflow.executors.workloads.task import TaskInstanceDTO
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import perform_heartbeat
from airflow.models.dagbag import DBDagBag
from airflow.models.trigger import Trigger
from airflow.observability.metrics import stats_utils
from airflow.sdk.api.datamodels._generated import HITLDetailResponse
Expand Down Expand Up @@ -84,10 +85,12 @@
_RequestFrame,
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.serialization.serialized_objects import DagSerialization
from airflow.triggers.base import BaseEventTrigger, BaseTrigger, DiscrimatedTriggerEvent, TriggerEvent
from airflow.utils.helpers import log_filename_template_renderer
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.session import create_session, provide_session

if TYPE_CHECKING:
from opentelemetry.util._decorator import _AgnosticContextManager
Expand All @@ -97,6 +100,7 @@
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
from airflow.jobs.job import Job
from airflow.sdk.api.client import Client
from airflow.sdk.definitions.context import Context
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -658,6 +662,65 @@ def emit_metrics(self):
extra_tags={"hostname": self.job.hostname},
)

def _create_workload(
self,
trigger: Trigger,
dag_bag: DBDagBag,
render_log_fname: Callable[..., str],
session: Session,
) -> workloads.RunTrigger | None:
if trigger.task_instance is None:
return workloads.RunTrigger(
id=trigger.id,
classpath=trigger.classpath,
encrypted_kwargs=trigger.encrypted_kwargs,
)

if not trigger.task_instance.dag_version_id:
# This is to handle 2 to 3 upgrade where TI.dag_version_id can be none
log.warning(
"TaskInstance associated with Trigger has no associated Dag Version, skipping the trigger",
ti_id=trigger.task_instance.id,
)
return None

log_path = render_log_fname(ti=trigger.task_instance)
ser_ti = TaskInstanceDTO.model_validate(trigger.task_instance, from_attributes=True)

# When producing logs from TIs, include the job id producing the logs to disambiguate it.
self.logger_cache[trigger.id] = TriggerLoggingFactory(
log_path=f"{log_path}.trigger.{self.job.id}.log",
ti=ser_ti, # type: ignore
)

serialized_dag_model = dag_bag.get_serialized_dag_model(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_serialized_dag_model() and the subsequent serialized_dag_model.dag.get_task() deserialization runs for every trigger with a task instance, not just ones with start_from_trigger=True. Most triggers (deferred sensors, etc.) don't use start_from_trigger, so this adds unnecessary DB and CPU overhead for the common case.

Consider adding a lightweight indicator (e.g., a boolean flag on the Trigger model or TI) so you can skip the DAG load entirely when start_from_trigger isn't in play.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will be done in yet another PR.

version_id=trigger.task_instance.dag_version_id,
session=session,
)

if serialized_dag_model:
task = serialized_dag_model.dag.get_task(trigger.task_instance.task_id)

# When a TaskInstance of a Trigger contains a task with start_from_trigger enabled,
# it means we need to load the SerializedDagModel so we can build a RuntimeTaskInstance later on which
# will allow us to build a context on which we will render the templated fields.
if task.start_from_trigger:
return workloads.RunTrigger(
id=trigger.id,
classpath=trigger.classpath,
encrypted_kwargs=trigger.encrypted_kwargs,
ti=ser_ti,
timeout_after=trigger.task_instance.trigger_timeout,
dag_data=serialized_dag_model.data,
)
return workloads.RunTrigger(
id=trigger.id,
classpath=trigger.classpath,
encrypted_kwargs=trigger.encrypted_kwargs,
ti=ser_ti,
timeout_after=trigger.task_instance.trigger_timeout,
)

def update_triggers(self, requested_trigger_ids: set[int]):
"""
Request that we update what triggers we're running.
Expand All @@ -666,8 +729,8 @@ def update_triggers(self, requested_trigger_ids: set[int]):
adds them to the dequeues so the subprocess can actually mutate the running
trigger set.
"""
dag_bag = DBDagBag()
render_log_fname = log_filename_template_renderer()

known_trigger_ids = (
self.running_triggers.union(x[0] for x in self.events)
.union(self.cancelling_triggers)
Expand All @@ -678,60 +741,48 @@ def update_triggers(self, requested_trigger_ids: set[int]):
new_trigger_ids = requested_trigger_ids - known_trigger_ids
cancel_trigger_ids = self.running_triggers - requested_trigger_ids
# Bulk-fetch new trigger records
new_triggers = Trigger.bulk_fetch(new_trigger_ids)
trigger_ids_with_non_task_associations = Trigger.fetch_trigger_ids_with_non_task_associations()
to_create: list[workloads.RunTrigger] = []
# Add in new triggers
for new_id in new_trigger_ids:
# Check it didn't vanish in the meantime
if new_id not in new_triggers:
log.warning("Trigger disappeared before we could start it", id=new_id)
continue

new_trigger_orm = new_triggers[new_id]

# If the trigger is not associated to a task, an asset, or a callback, this means the TaskInstance
# row was updated by either Trigger.submit_event or Trigger.submit_failure
# and can happen when a single trigger Job is being run on multiple TriggerRunners
# in a High-Availability setup.
if new_trigger_orm.task_instance is None and new_id not in trigger_ids_with_non_task_associations:
log.info(
(
"TaskInstance Trigger is None. It was likely updated by another trigger job. "
"Skipping trigger instantiation."
),
id=new_id,
)
continue

workload = workloads.RunTrigger(
classpath=new_trigger_orm.classpath,
id=new_id,
encrypted_kwargs=new_trigger_orm.encrypted_kwargs,
ti=None,
with create_session() as session:
# Bulk-fetch new trigger records
new_triggers = Trigger.bulk_fetch(new_trigger_ids, session=session)
trigger_ids_with_non_task_associations = Trigger.fetch_trigger_ids_with_non_task_associations(
session=session
)
if new_trigger_orm.task_instance:
log_path = render_log_fname(ti=new_trigger_orm.task_instance)
if not new_trigger_orm.task_instance.dag_version_id:
# This is to handle 2 to 3 upgrade where TI.dag_version_id can be none
log.warning(
"TaskInstance associated with Trigger has no associated Dag Version, skipping the trigger",
ti_id=new_trigger_orm.task_instance.id,
)
to_create: list[workloads.RunTrigger] = []
# Add in new triggers
for new_trigger_id in new_trigger_ids:
# Check it didn't vanish in the meantime
if new_trigger_id not in new_triggers:
log.warning("Trigger disappeared before we could start it", id=new_trigger_id)
continue
ser_ti = TaskInstanceDTO.model_validate(new_trigger_orm.task_instance, from_attributes=True)
# When producing logs from TIs, include the job id producing the logs to disambiguate it.
self.logger_cache[new_id] = TriggerLoggingFactory(
log_path=f"{log_path}.trigger.{self.job.id}.log",
ti=ser_ti, # type: ignore
)

workload.ti = ser_ti
workload.timeout_after = new_trigger_orm.task_instance.trigger_timeout
new_trigger_orm = new_triggers[new_trigger_id]

# If the trigger is not associated to a task, an asset, or a callback, this means the TaskInstance
# row was updated by either Trigger.submit_event or Trigger.submit_failure
# and can happen when a single trigger Job is being run on multiple TriggerRunners
# in a High-Availability setup.
if (
new_trigger_orm.task_instance is None
and new_trigger_id not in trigger_ids_with_non_task_associations
):
log.info(
(
"TaskInstance of Trigger is None. It was likely updated by another trigger job. "
"Skipping trigger instantiation."
),
id=new_trigger_id,
)
continue

to_create.append(workload)
if workload := self._create_workload(
trigger=new_trigger_orm,
dag_bag=dag_bag,
render_log_fname=render_log_fname,
session=session,
):
to_create.append(workload)

self.creating_triggers.extend(to_create)
self.creating_triggers.extend(to_create)

if cancel_trigger_ids:
# Enqueue orphaned triggers for cancellation
Expand Down Expand Up @@ -986,9 +1037,19 @@ async def init_comms(self):
raise RuntimeError(f"Required first message to be a messages.StartTriggerer, it was {msg}")

async def create_triggers(self):
def create_runtime_ti(encoded_dag: dict) -> RuntimeTaskInstance:
task = DagSerialization.from_dict(encoded_dag).get_task(workload.ti.task_id)

# I need to recreate a TaskInstance from task_runner before invoking get_template_context (airflow.executors.workloads.TaskInstance)
return RuntimeTaskInstance.model_construct(
**workload.ti.model_dump(exclude_unset=True),
task=task,
)

"""Drain the to_create queue and create all new triggers that have been requested in the DB."""
while self.to_create:
await asyncio.sleep(0)
context: Context | None = None
workload = self.to_create.popleft()
trigger_id = workload.id
if trigger_id in self.triggers:
Expand Down Expand Up @@ -1016,24 +1077,32 @@ async def create_triggers(self):
# that could cause None values in collections.
kw = Trigger._decrypt_kwargs(workload.encrypted_kwargs)
deserialised_kwargs = {k: smart_decode_trigger_kwargs(v) for k, v in kw.items()}
trigger_instance = trigger_class(**deserialised_kwargs)

if ti := workload.ti:
trigger_name = f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} (ID {trigger_id})"
trigger_instance = trigger_class(**deserialised_kwargs)

if workload.dag_data:
runtime_ti = create_runtime_ti(workload.dag_data)
context = runtime_ti.get_template_context()
trigger_instance.task_instance = runtime_ti
else:
trigger_instance.task_instance = ti
else:
trigger_name = f"ID {trigger_id}"
trigger_instance = trigger_class(**deserialised_kwargs)
except TypeError as err:
self.log.error("Trigger failed to inflate", error=err)
self.failed_triggers.append((trigger_id, err))
continue
trigger_instance.trigger_id = trigger_id
trigger_instance.triggerer_job_id = self.job_id
trigger_instance.task_instance = ti = workload.ti
trigger_instance.timeout_after = workload.timeout_after

trigger_name = (
f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} (ID {trigger_id})"
if ti
else f"ID {trigger_id}"
)
self.triggers[trigger_id] = {
"task": asyncio.create_task(
self.run_trigger(trigger_id, trigger_instance, workload.timeout_after), name=trigger_name
self.run_trigger(trigger_id, trigger_instance, workload.timeout_after, context),
name=trigger_name,
),
"is_watcher": isinstance(trigger_instance, BaseEventTrigger),
"name": trigger_name,
Expand Down Expand Up @@ -1200,7 +1269,13 @@ async def block_watchdog(self):
)
Stats.incr("triggers.blocked_main_thread")

async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after: datetime | None = None):
async def run_trigger(
self,
trigger_id: int,
trigger: BaseTrigger,
timeout_after: datetime | None = None,
context: Context | None = None,
):
"""Run a trigger (they are async generators) and push their events into our outbound event deque."""
if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == "true":
import greenback
Expand All @@ -1213,6 +1288,9 @@ async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after
self.log.info("trigger %s starting", name)
with _make_trigger_span(ti=trigger.task_instance, trigger_id=trigger_id, name=name) as span:
try:
if context is not None:
trigger.render_template_fields(context=context)

async for event in trigger.run():
await self.log.ainfo(
"Trigger fired event", name=self.triggers[trigger_id]["name"], result=event
Expand Down
Loading
Loading