Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions airflow-core/src/airflow/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,26 @@ def task_instance(self, value: TaskInstance | None) -> None:
if self.task_instance:
self.task_id = self.task_instance.task_id
if self.task:
self.template_fields = self.task.template_fields
self.template_ext = self.task.template_ext
# Only keep operator template_fields that are also keys in
# start_trigger_args.trigger_kwargs *and* exist on the trigger.
# Using the full operator template_fields would cause
# AttributeError when the trigger does not have attributes with
# the same names as the operator (e.g. "bash_command").
#
# When start_trigger_args is None (normal defer path), the triggerer
# does not build a template context, so render_template_fields is
# never called and empty template_fields is safe.
start_trigger_args = getattr(self.task, "start_trigger_args", None)
trigger_kwarg_keys = (
set((start_trigger_args.trigger_kwargs or {}).keys()) if start_trigger_args else set()
)
if trigger_kwarg_keys:
self.template_fields = tuple(
f for f in self.task.template_fields if f in trigger_kwarg_keys and hasattr(self, f)
)
else:
self.template_fields = ()

def render_template_fields(
self,
Expand All @@ -127,7 +145,8 @@ def render_template_fields(
"""
if not jinja_env:
jinja_env = self.get_template_env()
# We only need to render templated fields if templated fields are part of the start_trigger_args
# self.template_fields is already filtered (in the task_instance setter) to only
# include fields present in start_trigger_args.trigger_kwargs and on this trigger.
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())

@abc.abstractmethod
Expand Down
71 changes: 71 additions & 0 deletions airflow-core/tests/unit/triggers/test_base_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ class DummyOperator(BaseOperator):
template_fields = ("name",)


class OperatorWithExtraTemplateFields(BaseOperator):
"""Operator whose template_fields do NOT all exist on the trigger."""

template_fields = ("bash_command", "env", "name")

def __init__(self, bash_command="", env=None, name="", **kwargs):
super().__init__(**kwargs)
self.bash_command = bash_command
self.env = env
self.name = name


class DummyTrigger(BaseTrigger):
def __init__(self, name: str, **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -67,3 +79,62 @@ def test_render_template_fields(create_task_instance):
trigger.render_template_fields(context={"name": "world"})

assert trigger.name == "Hello world"


@pytest.mark.db_test
def test_render_template_fields_filters_to_trigger_kwargs(create_task_instance):
"""Only fields present in both trigger_kwargs and on the trigger should be rendered.

Operator template_fields like 'bash_command' and 'env' that don't exist on the
trigger must be excluded to avoid AttributeError.
"""
op = OperatorWithExtraTemplateFields(
task_id="extra_fields_task",
bash_command="echo hello",
env={"KEY": "val"},
name="static",
)
ti = create_task_instance(
task=op,
start_from_trigger=True,
start_trigger_args=StartTriggerArgs(
trigger_cls=f"{DummyTrigger.__module__}.{DummyTrigger.__qualname__}",
next_method="resume_method",
trigger_kwargs={"name": "Hello {{ name }}"},
),
)

trigger = DummyTrigger(name="Hello {{ name }}")
trigger.task_instance = ti

# Only 'name' should be in template_fields; 'bash_command' and 'env' are excluded
# because they aren't keys in trigger_kwargs or don't exist on the trigger.
assert trigger.template_fields == ("name",)

# Rendering must not raise AttributeError for missing operator fields
trigger.render_template_fields(context={"name": "world"})
assert trigger.name == "Hello world"


@pytest.mark.db_test
def test_render_template_fields_empty_when_no_trigger_kwargs(create_task_instance):
"""When start_trigger_args has no trigger_kwargs, template_fields should be empty."""
op = DummyOperator(task_id="no_kwargs_task")
ti = create_task_instance(
task=op,
start_from_trigger=True,
start_trigger_args=StartTriggerArgs(
trigger_cls=f"{DummyTrigger.__module__}.{DummyTrigger.__qualname__}",
next_method="resume_method",
trigger_kwargs=None,
),
)

trigger = DummyTrigger(name="Hello {{ name }}")
trigger.task_instance = ti

assert trigger.template_fields == ()

# Rendering with empty template_fields is a no-op
trigger.render_template_fields(context={"name": "world"})
assert trigger.name == "Hello {{ name }}"
Loading