Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sentry/features/temporary.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ def register_temporary_features(manager: FeatureManager) -> None:
manager.add("organizations:workflow-engine-metric-alert-dual-processing-logs", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable Creation of Metric Alerts that use the `group_by` field in the workflow_engine
manager.add("organizations:workflow-engine-metric-alert-group-by-creation", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable caching for workflow action filters
manager.add("organizations:workflow-engine-action-filters-cache", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable ingestion through trusted relays only
manager.add("organizations:ingest-through-trusted-relays-only", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
# Enable metric issue UI for issue alerts
Expand Down
25 changes: 19 additions & 6 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from sentry.models.environment import Environment
from sentry.services.eventstore.models import GroupEvent
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient, DelayedWorkflowItem
from sentry.workflow_engine.caches.action_filters import get_action_filters_by_workflows
from sentry.workflow_engine.caches.workflow import get_workflows_by_detectors
from sentry.workflow_engine.models import DataConditionGroup, Detector, DetectorWorkflow, Workflow
from sentry.workflow_engine.models.data_condition import DataCondition
Expand Down Expand Up @@ -269,12 +270,24 @@ def evaluate_workflows_action_filters(
queue_items_by_workflow.keys()
)

action_conditions_to_workflow: dict[DataConditionGroup, Workflow] = {
wdcg.condition_group: wdcg.workflow
for wdcg in WorkflowDataConditionGroup.objects.select_related(
"workflow", "condition_group"
).filter(workflow__in=all_workflows)
}
organization = event_data.event.project.organization

action_conditions_to_workflow: dict[DataConditionGroup, Workflow] = {}

if features.has("organizations:workflow-engine-action-filters-cache", organization):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind adding a test to the processing method to make sure that this is working as expected when it's enabled? (that should help guard against any rollout concerns and flipping the switch later. we would ideally be testing the majority of the logic in the tests for evaluating action filters)

all_workflows_lookup: dict[int, Workflow] = {w.id: w for w in all_workflows}
action_filters_by_workflows = get_action_filters_by_workflows(all_workflows)

for workflow_id, dcgs in action_filters_by_workflows.items():
for dcg in dcgs:
action_conditions_to_workflow[dcg] = all_workflows_lookup[workflow_id]
else:
action_conditions_to_workflow = {
wdcg.condition_group: wdcg.workflow
for wdcg in WorkflowDataConditionGroup.objects.select_related(
"workflow", "condition_group"
).filter(workflow__in=all_workflows)
}

filtered_action_groups: set[DataConditionGroup] = set()

Expand Down
36 changes: 36 additions & 0 deletions tests/sentry/workflow_engine/processors/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,16 @@ def test_evaluation_stats_add(self) -> None:
b = EvaluationStats(tainted=3, untainted=4)
assert a + b == EvaluationStats(tainted=4, untainted=6)

# Temporary test to exercise all evaluate_workflows_action_filters paths
# with caching enabled
def test_action_filter_stats_excludes_delayed_workflows__with_cache(self) -> None:
with self.feature("organizations:workflow-engine-action-filters-cache"):
self.test_action_filter_stats_excludes_delayed_workflows()

def test_action_filter_stats_from_trigger_result__with_cache(self) -> None:
with self.feature("organizations:workflow-engine-action-filters-cache"):
self.test_action_filter_stats_from_trigger_result()


@freeze_time(FROZEN_TIME)
class TestWorkflowEnqueuing(BaseWorkflowTest):
Expand Down Expand Up @@ -1127,6 +1137,32 @@ def test_enqueues_when_slow_conditions(self) -> None:
)
assert list(project_ids.keys()) == [self.project.id]

# Temporary tests to exercise all evaluate_workflows_action_filters paths
# with caching enabled
def test_activity__with_slow_conditions__with_cache(self) -> None:
with self.feature("organizations:workflow-engine-action-filters-cache"):
self.test_activity__with_slow_conditions()

def test_enqueues_when_slow_conditions__with_cache(self) -> None:
with self.feature("organizations:workflow-engine-action-filters-cache"):
self.test_enqueues_when_slow_conditions()

def test_with_slow_conditions__with_cache(self) -> None:
with self.feature("organizations:workflow-engine-action-filters-cache"):
self.test_with_slow_conditions()

def test_basic__with_filter__filtered__with_cache(self) -> None:
with self.feature("organizations:workflow-engine-action-filters-cache"):
self.test_basic__with_filter__filtered()

def test_basic__with_filter__passes__with_cache(self) -> None:
with self.feature("organizations:workflow-engine-action-filters-cache"):
self.test_basic__with_filter__passes()

def test_basic__no_filter__with_cache(self) -> None:
with self.feature("organizations:workflow-engine-action-filters-cache"):
self.test_basic__no_filter()


class TestEnqueueWorkflows(BaseWorkflowTest):
def setUp(self) -> None:
Expand Down
Loading