Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/ti_opencti/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
# newer versions go on top
- version: "2.9.0"
changes:
- description: Add comprehensive filtering support for indicators including pattern types, confidence levels, labels, dates, authors, creators, and marking definitions.
type: enhancement
link: https://github.com/elastic/integrations/pull/15332
- description: Implement deduplication mechanism using fingerprint processor to prevent duplicate indicators when running multiple agents.
type: enhancement
link: https://github.com/elastic/integrations/pull/15332
- description: Add state management to track last modified timestamp and prevent re-fetching already processed indicators.
type: enhancement
link: https://github.com/elastic/integrations/pull/15332
- description: Update OpenCTI logos for better visual consistency.
type: enhancement
link: https://github.com/elastic/integrations/pull/15332
- version: "2.8.0"
changes:
- description: Add script processor to drop all nulls / empty strings.
Expand Down
128 changes: 111 additions & 17 deletions packages/ti_opencti/data_stream/indicator/agent/stream/cel.yml.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,96 @@ fields:
url: {{url}}
program: |
request(
"POST",
state.url.trim_suffix("graphql").trim_suffix("/") + "/graphql"
).with({
"Header": ({
"Content-Type": ["application/json"]
}).with(
has(state.api_key) && size(state.api_key) > 0 ?
{ "Authorization": ["Bearer " + state.api_key] }
:
{}
)
}).with({
"Body": {
"query": state.query,
"variables": {
"POST",
state.url.trim_suffix("graphql").trim_suffix("/") + "/graphql"
).with({
"Header": ({
"Content-Type": ["application/json"]
}).with(
has(state.api_key) && size(state.api_key) > 0 ?
{ "Authorization": ["Bearer " + state.api_key] }
:
{}
)
}).with({
"Body": {
"query": state.query,
"variables": {
Comment on lines +40 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

"after": has(state.cursor) && has(state.cursor.value) ? state.cursor.value : null,
"first": state.page_size,
"orderBy": "modified",
"orderMode": "asc",
"filters": (
// Build the FilterGroup object
(
(has(state.pattern_types) && size(state.pattern_types) > 0) ||
(has(state.indicator_types) && size(state.indicator_types) > 0) ||
(has(state.revoked) && state.revoked != null) ||
(has(state.valid_from_start) && state.valid_from_start != null) ||
(has(state.valid_until_end) && state.valid_until_end != null) ||
(has(state.label_ids) && size(state.label_ids) > 0) ||
(has(state.confidence_min) && state.confidence_min != null) ||
(has(state.author_ids) && size(state.author_ids) > 0) ||
(has(state.creator_ids) && size(state.creator_ids) > 0) ||
(has(state.created_after) && state.created_after != null) ||
(has(state.modified_after) && state.modified_after != null) ||
(has(state.last_modified) && state.last_modified != null) ||
(has(state.marking_ids) && size(state.marking_ids) > 0)
) ?
{
"mode": "and",
"filters": (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done more readably with an expression mapping over the set of filters.

// Always filter for Indicator entity type
[{"key": "entity_type", "values": ["Indicator"], "operator": "eq"}] +
(has(state.pattern_types) && size(state.pattern_types) > 0 ?
[{"key": "pattern_type", "values": state.pattern_types, "operator": "eq", "mode": "or"}] : []
) +
(has(state.indicator_types) && size(state.indicator_types) > 0 ?
[{"key": "indicator_types", "values": state.indicator_types, "operator": "eq", "mode": "or"}] : []
) +
(has(state.revoked) && state.revoked != null ?
[{"key": "revoked", "values": [state.revoked == "true"], "operator": "eq"}] : []
) +
(has(state.valid_from_start) && state.valid_from_start != null ?
[{"key": "valid_from", "values": [state.valid_from_start], "operator": "gte"}] : []
) +
(has(state.valid_until_end) && state.valid_until_end != null ?
[{"key": "valid_until", "values": [state.valid_until_end], "operator": "lte"}] : []
) +
(has(state.label_ids) && size(state.label_ids) > 0 ?
[{"key": "objectLabel", "values": state.label_ids, "operator": "eq", "mode": "or"}] : []
) +
(has(state.confidence_min) && state.confidence_min != null ?
[{"key": "confidence", "values": [string(state.confidence_min)], "operator": "gte"}] : []
) +
(has(state.author_ids) && size(state.author_ids) > 0 ?
[{"key": "createdBy", "values": state.author_ids, "operator": "eq", "mode": "or"}] : []
) +
(has(state.creator_ids) && size(state.creator_ids) > 0 ?
[{"key": "creator_id", "values": state.creator_ids, "operator": "eq", "mode": "or"}] : []
) +
(has(state.created_after) && state.created_after != null ?
[{"key": "created", "values": [state.created_after], "operator": "gt"}] : []
) +
(has(state.last_modified) && state.last_modified != null ?
[{"key": "updated_at", "values": [state.last_modified], "operator": "gt"}] :
(has(state.modified_after) && state.modified_after != null ?
[{"key": "updated_at", "values": [state.modified_after], "operator": "gt"}] : []
)
) +
(has(state.marking_ids) && size(state.marking_ids) > 0 ?
[{"key": "objectMarking", "values": state.marking_ids, "operator": "eq", "mode": "or"}] : []
)
),
"filterGroups": []
} :
// Default filter: always filter for Indicator entity type
{
"mode": "and",
"filters": [{"key": "entity_type", "values": ["Indicator"], "operator": "eq"}],
"filterGroups": []
}
)
}
}.encode_json()
}).do_request().as(resp,
Expand All @@ -65,7 +136,8 @@ program: |
"events": [{
"error": { "message": body.errors.map(e, e.message) },
"event": { "original": body.encode_json() }
}]
}],
"last_modified": state.?last_modified.orValue(null)
})
:
state.with({
Expand All @@ -77,30 +149,52 @@ program: |
)),
"want_more": body.data.indicators.pageInfo.hasNextPage,
"cursor": { "value": body.data.indicators.pageInfo.endCursor },
"last_modified": has(body.data.indicators.edges) && body.data.indicators.edges.size() > 0 ?
body.data.indicators.edges.map(e, e.node.modified).max()
:
state.?last_modified.orValue(null)
})
)
)
)
redact:
fields:
- api_key
state:
url: {{url}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not required.

api_key: {{api_key}}
page_size: {{page_size}}
preserve_original_event: {{preserve_original_event}}
want_more: false
# Track last modified timestamp to avoid re-fetching
last_modified: null
# Filter configuration
pattern_types: {{#if pattern_types}}{{pattern_types}}{{else}}[]{{/if}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be in the pattern

Suggested change
pattern_types: {{#if pattern_types}}{{pattern_types}}{{else}}[]{{/if}}
{{#if pattern_types}}
pattern_types: {{pattern_types}}
{{/if}}

but to aid the simpler construction of the filter parameter, probably more like

Suggested change
pattern_types: {{#if pattern_types}}{{pattern_types}}{{else}}[]{{/if}}
{{#if pattern_types}}
filter.pattern_types: {{pattern_types}}
{{/if}}

indicator_types: {{#if indicator_types}}{{indicator_types}}{{else}}[]{{/if}}
revoked: {{#if revoked}}"{{revoked}}"{{else}}null{{/if}}
valid_from_start: {{#if valid_from_start}}"{{valid_from_start}}"{{else}}null{{/if}}
valid_until_end: {{#if valid_until_end}}"{{valid_until_end}}"{{else}}null{{/if}}
label_ids: {{#if label_ids}}{{label_ids}}{{else}}[]{{/if}}
confidence_min: {{#if confidence_min}}{{confidence_min}}{{else}}null{{/if}}
author_ids: {{#if author_ids}}{{author_ids}}{{else}}[]{{/if}}
creator_ids: {{#if creator_ids}}{{creator_ids}}{{else}}[]{{/if}}
created_after: {{#if created_after}}"{{created_after}}"{{else}}null{{/if}}
modified_after: {{#if modified_after}}"{{modified_after}}"{{else}}null{{/if}}
marking_ids: {{#if marking_ids}}{{marking_ids}}{{else}}[]{{/if}}
# How to work with this API: https://docs.opencti.io/latest/deployment/integrations/#graphql-api
# Relevant schema source: https://github.com/OpenCTI-Platform/opencti/blob/master/opencti-platform/opencti-graphql/config/schema/opencti.graphql
query: |

query IndicatorsLinesPaginationQuery(
$search: String
$filters: FilterGroup
$first: Int!
$after: ID
$orderBy: IndicatorsOrdering
$orderMode: OrderingMode
) {
indicators(
search: $search
filters: $filters
first: $first
after: $after
orderBy: $orderBy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ processors:
field: id
target_field: event.id

###################################
# Deduplication using fingerprint #
###################################

# Generate a deterministic ID based on the standard_id
# This ensures the same indicator always gets the same document ID
- fingerprint:
tag: generate_document_id_for_deduplication
fields:
- standard_id
target_field: _id
method: SHA-256
if: ctx.standard_id != null && ctx.standard_id != ''
description: Generate deterministic document ID from standard_id for deduplication
ignore_failure: false

######################
# Threat feed fields #
######################
Expand Down Expand Up @@ -868,6 +884,86 @@ processors:
ctx.threat.indicator.x509 = mergeListOfMaps(ctx.threat.indicator.x509);
}

###################################
# Fingerprint for deduplication #
###################################

# Generate a consistent document ID based on the indicator's unique STIX ID
# This prevents duplicates when multiple agents fetch the same data
- fingerprint:
fields:
- opencti.indicator.standard_id # STIX ID is globally unique and consistent
target_field: "_id"
ignore_missing: false
description: Generate consistent document ID for deduplication across multiple agents

###########################################################
# Tag indicators suitable for Security rule creation #
###########################################################

- set:
field: opencti.indicator.rule_compatible
value: true
if: |
ctx.opencti?.indicator?.pattern_type != null && (
ctx.opencti.indicator.pattern_type == 'kql' ||
ctx.opencti.indicator.pattern_type == 'lucene' ||
ctx.opencti.indicator.pattern_type == 'eql' ||
ctx.opencti.indicator.pattern_type == 'esql'
) && ctx.opencti?.indicator?.revoked != true
description: Mark indicators that can be converted to detection rules

- append:
field: tags
value: detection-rule-candidate
if: ctx.opencti?.indicator?.rule_compatible == true
allow_duplicates: false

- set:
field: opencti.indicator.detection_rule.type
value: query
if: ctx.opencti?.indicator?.pattern_type == 'kql' || ctx.opencti?.indicator?.pattern_type == 'lucene'

- set:
field: opencti.indicator.detection_rule.type
value: eql
if: ctx.opencti?.indicator?.pattern_type == 'eql'

- set:
field: opencti.indicator.detection_rule.type
value: esql
if: ctx.opencti?.indicator?.pattern_type == 'esql'

- set:
field: opencti.indicator.detection_rule.query
copy_from: opencti.indicator.pattern
if: ctx.opencti?.indicator?.rule_compatible == true

- script:
description: Set detection rule severity based on confidence
lang: painless
if: ctx.opencti?.indicator?.rule_compatible == true
source: |
if (ctx.opencti?.indicator?.score != null) {
int score = ctx.opencti.indicator.score;
if (score >= 80) {
ctx.opencti.indicator.detection_rule.severity = 'critical';
ctx.opencti.indicator.detection_rule.risk_score = 90;
} else if (score >= 60) {
ctx.opencti.indicator.detection_rule.severity = 'high';
ctx.opencti.indicator.detection_rule.risk_score = 70;
} else if (score >= 40) {
ctx.opencti.indicator.detection_rule.severity = 'medium';
ctx.opencti.indicator.detection_rule.risk_score = 50;
} else {
ctx.opencti.indicator.detection_rule.severity = 'low';
ctx.opencti.indicator.detection_rule.risk_score = 30;
}
} else {
ctx.opencti.indicator.detection_rule.severity = 'medium';
ctx.opencti.indicator.detection_rule.risk_score = 50;
}

#######################################################
# Tag to show if ECS indicator details were populated #
#######################################################
Expand Down
Loading