Skip to content

[New Rules] External Promotion Alerts #4903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Our rules should be written generically when possible. We use [Elastic Common Sc

If the relevant [categorization values](https://www.elastic.co/guide/en/ecs/current/ecs-category-field-values-reference.html) are already defined for ECS, we use these to narrow down the event type before adding the query. Typically, the query starts with the broadest grouping possible and gets narrower for each clause. For example, we might write `event.category:process and event.type:start and process.name:net.exe and process.args:group`. First, we match process events with `event.category`, then narrow to creation events with `event.type`. Of the process creation events, we're looking for the process `net.exe` with `process.name` and finally we check the arguments `group` by looking at `process.args`. This flow has little effect on the generated Elasticsearch query, but is the most intuitive to read for rule developers.

Sometimes, it might not make sense for ECS to standardize a field, value, or category. Occasionally, we may encounter fields that specific to a single use-case or vendor. When that happens, we add an exception in [detection_rules/etc/non-ecs-schema.json](detection_rules/etc/non-ecs-schema.json). We automatically detect beats by looking at the index patterns used in a rule. If we see `winlogbeat-*`, for example, then we can validate the rule against ECS + Winlogbeat. When using a particular beat, please use `event.module` and `event.dataset` to make the rule more precise and to better nudge the validation logic. Similar to our logic flow for ECS categorization, we recommend searches progress from `event.module` → `event.dataset` → `event.action` → `<additional criteria>`.
Sometimes, it might not make sense for ECS to standardize a field, value, or category. Occasionally, we may encounter fields that specific to a single use-case or vendor. When that happens, we add an exception in [detection_rules/etc/non-ecs-schema.json](detection_rules/etc/non-ecs-schema.json). We automatically detect beats by looking at the index patterns used in a rule. If we see `winlogbeat-*`, for example, then we can validate the rule against ECS + Winlogbeat. When using a particular beat, please use `event.module` and `data_stream.dataset` to make the rule more precise and to better nudge the validation logic. Similar to our logic flow for ECS categorization, we recommend searches progress from `event.module` → `data_stream.dataset` → `event.action` → `<additional criteria>`.

When a Pull Request is missing a necessary ECS change, please add an issue to [elastic/ecs](https://github.com/elastic/ecs) and link it from the pull request. We don't want to leave PRs blocked for too long, so if the ECS issue isn't progressing, then we can add a note and use the vendor- or beat-specific fields. We'll create another issue, reminding us to update the rule logic to switch to the ECS field when it becomes available. To maximize compatibility, we may add an `or` clause for a release or two to handle the different permutatations. After a few releases, we'll remove this and strictly require the ECS fields.

Expand Down
12 changes: 9 additions & 3 deletions detection_rules/beats.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def get_datasets_and_modules(tree: eql.ast.BaseNode | kql.ast.BaseNode) -> tuple
modules: set[str] = set()
datasets: set[str] = set()

# extract out event.module and event.dataset from the query's AST
# extract out event.module, data_stream.dataset, and event.dataset from the query's AST
for node in tree: # type: ignore[reportUnknownVariableType]
if (
isinstance(node, eql.ast.Comparison)
Expand All @@ -265,17 +265,23 @@ def get_datasets_and_modules(tree: eql.ast.BaseNode | kql.ast.BaseNode) -> tuple
):
if node.left == eql.ast.Field("event", ["module"]):
modules.add(node.right.render()) # type: ignore[reportUnknownMemberType]
elif node.left == eql.ast.Field("event", ["dataset"]):
elif node.left == eql.ast.Field("event", ["dataset"]) or node.left == eql.ast.Field(
"data_stream", ["dataset"]
):
datasets.add(node.right.render()) # type: ignore[reportUnknownMemberType]
elif isinstance(node, eql.ast.InSet):
if node.expression == eql.ast.Field("event", ["module"]):
modules.update(node.get_literals()) # type: ignore[reportUnknownMemberType]
elif node.expression == eql.ast.Field("event", ["dataset"]):
elif node.expression == eql.ast.Field("event", ["dataset"]) or node.expression == eql.ast.Field(
"data_stream", ["dataset"]
):
datasets.update(node.get_literals()) # type: ignore[reportUnknownMemberType]
elif isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.module"): # type: ignore[reportUnknownMemberType]
modules.update(child.value for child in node.value if isinstance(child, kql.ast.String)) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
elif isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.dataset"): # type: ignore[reportUnknownMemberType]
datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String)) # type: ignore[reportUnknownMemberType, reportUnknownVariableType]
elif isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("data_stream.dataset"): # type: ignore[reportUnknownMemberType]
datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String)) # type: ignore[reportUnknownMemberType]

return datasets, modules

Expand Down
2 changes: 1 addition & 1 deletion detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@ def get_packaged_integrations(
datasets, _ = beats.get_datasets_and_modules(data.get("ast") or []) # type: ignore[reportArgumentType]

# integration is None to remove duplicate references upstream in Kibana
# chronologically, event.dataset is checked for package:integration, then rule tags
# chronologically, event.dataset, data_stream.dataset is checked for package:integration, then rule tags
# if both exist, rule tags are only used if defined in definitions for non-dataset packages
# of machine learning analytic packages

Expand Down
8 changes: 4 additions & 4 deletions detection_rules/rule_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> KQL_ERRO
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"
trailer = f"\nTry adding event.module or data_stream.dataset to specify beats module\n\n{trailer}"

return kql.KqlParseError(
exc.error_msg, # type: ignore[reportUnknownArgumentType]
Expand Down Expand Up @@ -258,7 +258,7 @@ def validate_integration( # noqa: PLR0912
if exc.error_msg == "Unknown field":
field = extract_error_field(self.query, exc)
trailer = (
f"\n\tTry adding event.module or event.dataset to specify integration module\n\t"
f"\n\tTry adding event.module or data_stream.dataset to specify integration module\n\t"
f"Will check against integrations {meta.integration} combined.\n\t"
f"{package=}, {integration=}, {integration_schema_data['package_version']=}, "
f"{integration_schema_data['stack_version']=}, "
Expand Down Expand Up @@ -512,7 +512,7 @@ def validate_integration( # noqa: PLR0912
if message == "Unknown field" or "Field not recognized" in message:
field = extract_error_field(self.query, exc)
trailer = (
f"\n\tTry adding event.module or event.dataset to specify integration module\n\t"
f"\n\tTry adding event.module or data_stream.dataset to specify integration module\n\t"
f"Will check against integrations {meta.integration} combined.\n\t"
f"{package=}, {integration=}, {package_version=}, "
f"{stack_version=}, {ecs_version=}"
Expand Down Expand Up @@ -571,7 +571,7 @@ def validate_query_with_schema(
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"
trailer = f"\nTry adding event.module or data_stream.dataset to specify beats module\n\n{trailer}"
elif "Field not recognized" in message:
text_fields = self.text_fields(schema)
if text_fields:
Expand Down
1 change: 1 addition & 0 deletions detection_rules/schemas/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def validator_wrapper(value: Any) -> Any:
"OS: Linux",
"OS: macOS",
"OS: Windows",
"Promotion: External Alerts",
"Rule Type: BBR",
"Resources: Investigation Guide",
"Rule Type: Higher-Order Rule",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.3.14"
version = "1.3.15"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
106 changes: 106 additions & 0 deletions rules/promotions/crowdstrike_external_alerts.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
[metadata]
creation_date = "2025/08/05"
integration = ["crowdstrike"]
maturity = "production"
promotion = true
min_stack_version = "8.18.0"
min_stack_comments = "Introduced support for CrowdStrike alert promotion"
updated_date = "2025/08/05"

[rule]
author = ["Elastic"]
description = """
Generates a detection alert for each CrowdStrike alert written to the configured indices. Enabling this rule allows you
to immediately begin investigating CrowdStrike alerts in the app.
"""
from = "now-2m"
index = ["logs-crowdstrike.alert-*"]
interval = "1m"
language = "kuery"
license = "Elastic License v2"
max_signals = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason we set this to 1000?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consistency with #4556

name = "CrowdStrike External Alerts"
note = """## Triage and analysis

### Investigating CrowdStrike External Alerts

CrowdStrike Falcon is a cloud-native endpoint protection platform that delivers real-time threat detection and response capabilities. The 'Behavior - Detected - CrowdStrike Alerts' rule captures security alerts generated by Falcon and enables analysts to investigate threats rapidly based on behavioral indicators and threat intelligence.

### Possible investigation steps

- Review the associated process, file path, and command line to determine whether the activity is legitimate or suspicious.
- Investigate the user account and host involved in the alert to validate whether the activity was authorized.
- Cross-reference the alert with CrowdStrike Falcon console for additional context, including process tree, behavioral tags, and threat intelligence matches.
- Check for any related alerts from the same host, user, or file hash to identify whether this is part of a larger attack chain.
- Consult the Crowdstrike investigation guide and resources tagged in the alert for specific guidance on handling similar threats.

### False positive analysis

- Alerts involving known and trusted software tools (e.g., remote administration tools) may be false positives. Confirm intent before excluding.
- Security assessments or penetration testing activities might mimic real threats. Validate the activity with responsible teams.
- Scheduled jobs, IT scripts, or automation tools may trigger alerts if they behave similarly to malicious code.
- Review alerts based on detection confidence levels and behavioral scoring to filter out low-confidence or known-benign triggers.

### Response and remediation

- Isolate affected endpoints to prevent lateral movement if malicious behavior is confirmed.
- Quarantine any identified malicious files and block related hashes or domains.
- Investigate how the threat entered the environment and close any exploited vulnerabilities.
- Reset credentials for compromised user accounts or escalate to incident response.
- Review CrowdStrike Falcon policies and detections to fine-tune future alerting and response coverage.
- Document the findings and update detection logic or exceptions accordingly.
"""
references = ["https://docs.elastic.co/en/integrations/crowdstrike"]
risk_score = 47
rule_id = "aeebe561-c338-4118-9924-8cb4e478aa58"
rule_name_override = "message"
setup = """## Setup

### CrowdStrike Alert Integration
This rule is designed to capture alert events generated by the CrowdStrike integration and promote them as Elastic detection alerts.

To capture CrowdStrike alerts, install and configure the CrowdStrike integration to ingest alert events into the `logs-crowdstrike.alert-*` index pattern.

If this rule is enabled alongside the External Alerts promotion rule (UUID: eb079c62-4481-4d6e-9643-3ca499df7aaa), you may receive duplicate alerts for the same CrowdStrike events. Consider adding a rule exception for the External Alert rule to exclude data_stream.dataset:crowdstrike.alert to avoid receiving duplicate alerts.

### Additional notes

For information on troubleshooting the maximum alerts warning please refer to this [guide](https://www.elastic.co/guide/en/security/current/alerts-ui-monitor.html#troubleshoot-max-alerts).
"""
severity = "medium"
tags = ["Data Source: CrowdStrike", "Use Case: Threat Detection", "Resources: Investigation Guide", "Promotion: External Alerts"]
timestamp_override = "event.ingested"
type = "query"

query = '''
event.kind: alert and data_stream.dataset: crowdstrike.alert
'''

[[rule.risk_score_mapping]]
field = "crowdstrike.alert.incident.score"
operator = "equals"
value = ""

[[rule.severity_mapping]]
field = "event.severity"
operator = "equals"
severity = "low"
value = "21"

[[rule.severity_mapping]]
field = "event.severity"
operator = "equals"
severity = "medium"
value = "47"

[[rule.severity_mapping]]
field = "event.severity"
operator = "equals"
severity = "high"
value = "73"

[[rule.severity_mapping]]
field = "event.severity"
operator = "equals"
severity = "critical"
value = "99"
110 changes: 110 additions & 0 deletions rules/promotions/elastic_security_external_alerts.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
[metadata]
creation_date = "2025/08/05"
integration = ["elastic_security"]
maturity = "production"
promotion = true
min_stack_version = "8.18.0"
min_stack_comments = "Introduced support for Elastic Security alert promotion"
updated_date = "2025/08/05"

[rule]
author = ["Elastic"]
description = """
Generates a detection alert for each Elastic Security alert written to the configured indices. Enabling this rule
allows you to immediately begin investigating Elastic Security alerts in the app.
"""
from = "now-2m"
index = ["logs-elastic_security.alert-*"]
interval = "1m"
language = "kuery"
license = "Elastic License v2"
max_signals = 1000
name = "Elastic Security External Alerts"
note = """
## Triage and analysis

### Investigating Elastic Security External Alerts

Elastic Security is a comprehensive security platform that provides real-time visibility into your environment, helping you detect and respond to threats effectively. The 'Behavior - Detected - Elastic Security Alerts' rule identifies such threats by monitoring specific alert events, enabling analysts to swiftly investigate and mitigate potential security incidents.

### Possible investigation steps

- Correlate the alert with recent activity on the affected endpoint to identify any unusual or suspicious behavior patterns.
- Check for any additional alerts or logs related to the same endpoint or user to determine if this is part of a broader attack or isolated incident.
- Investigate the source and destination IP addresses involved in the alert to assess if they are known to be malicious or associated with previous threats.
- Analyze any files or processes flagged in the alert to determine if they are legitimate or potentially malicious, using threat intelligence sources if necessary.
- Consult the Elastic Security investigation guide and resources tagged in the alert for specific guidance on handling similar threats.

### False positive analysis

- Alerts triggered by routine software updates or patches can be false positives. Review the context of the alert to determine if it aligns with scheduled maintenance activities.
- Legitimate administrative tools or scripts may trigger alerts. Identify and whitelist these tools if they are verified as non-threatening.
- Frequent alerts from known safe applications or processes can be excluded by creating exceptions for these specific behaviors in the Elastic Security configuration.
- Network scanning or monitoring tools used by IT teams might be flagged. Ensure these tools are documented and excluded from triggering alerts if they are part of regular operations.
- User behavior that is consistent with their role but triggers alerts should be reviewed. If deemed non-malicious, adjust the rule to exclude these specific user actions.

### Response and remediation

- Isolate the affected endpoint immediately to prevent lateral movement and further compromise within the network.
- Analyze the specific alert details to identify the nature of the threat and any associated indicators of compromise (IOCs).
- Remove or quarantine any malicious files or processes identified by the Elastic Security alert to neutralize the threat.
- Apply relevant security patches or updates to address any exploited vulnerabilities on the affected endpoint.
- Conduct a thorough scan of the network to identify any additional endpoints that may have been compromised or are exhibiting similar behavior.
- Document the incident and escalate to the appropriate security team or management if the threat is part of a larger attack campaign or if additional resources are needed for remediation.
- Review and update endpoint protection policies and configurations to enhance detection and prevention capabilities against similar threats in the future.
"""
references = ["https://docs.elastic.co/en/integrations/elastic_security"]
risk_score = 47
rule_id = "720fc1aa-e195-4a1d-81d8-04edfe5313ed"
rule_name_override = "rule.name"
setup = """## Setup

### Elastic Security Alert Integration
This rule is designed to capture alert events generated by the Elastic Security integration and promote them as Elastic detection alerts.

To capture Elastic Security alerts, install and configure the Elastic Security integration to ingest alert events into the `logs-elastic_security.alert-*` index pattern.

If this rule is enabled alongside the External Alerts promotion rule (UUID: eb079c62-4481-4d6e-9643-3ca499df7aaa), you may receive duplicate alerts for the same Elastic Security events. Consider adding a rule exception for the External Alert rule to exclude data_stream.dataset:elastic_security.alert to avoid receiving duplicate alerts.

### Additional notes

For information on troubleshooting the maximum alerts warning please refer to this [guide](https://www.elastic.co/guide/en/security/current/alerts-ui-monitor.html#troubleshoot-max-alerts).
"""
severity = "medium"
tags = ["Data Source: Elastic Security", "Use Case: Threat Detection", "Resources: Investigation Guide", "Promotion: External Alerts"]
timestamp_override = "event.ingested"
type = "query"

query = '''
event.kind: alert and data_stream.dataset: elastic_security.alert
'''


[[rule.risk_score_mapping]]
field = "event.risk_score"
operator = "equals"
value = ""

[[rule.severity_mapping]]
field = "event.severity"
operator = "equals"
severity = "low"
value = "21"

[[rule.severity_mapping]]
field = "event.severity"
operator = "equals"
severity = "medium"
value = "47"

[[rule.severity_mapping]]
field = "event.severity"
operator = "equals"
severity = "high"
value = "73"

[[rule.severity_mapping]]
field = "event.severity"
operator = "equals"
severity = "critical"
value = "99"
Loading
Loading