Skip to content

[FR] [DAC] Add Arbitrary File location Support for Local Creation Date #4915

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ Options:
-snv, --strip-none-values Strip None values from the rule
-lc, --local-creation-date Preserve the local creation date of the rule
-lu, --local-updated-date Preserve the local updated date of the rule
-lr, --load-rule-loading Enable arbitrary rule loading from the rules directories (Can be very slow!)
-h, --help Show this message and exit.
```

Expand Down Expand Up @@ -507,6 +508,7 @@ Options:
-lu, --local-updated-date Preserve the local updated date of the rule
-cro, --custom-rules-only Only export custom rules
-eq, --export-query TEXT Apply a query filter to exporting rules e.g. "alert.attributes.tags: \"test\"" to filter for rules that have the tag "test"
-lr, --load-rule-loading Enable arbitrary rule loading from the rules directories (Can be very slow!)
-h, --help Show this message and exit.

```
Expand Down
30 changes: 28 additions & 2 deletions detection_rules/kbwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
import sys
from pathlib import Path
from typing import Any
from typing import Any, cast

import click
import kql # type: ignore[reportMissingTypeStubs]
Expand All @@ -28,6 +28,7 @@
from .misc import add_params, get_kibana_client, kibana_options, nested_set, raise_client_error
from .rule import TOMLRule, TOMLRuleContents, downgrade_contents_from_rule
from .rule_loader import RuleCollection, update_metadata_from_file
from .schemas import definitions # noqa: TC001
from .utils import format_command_options, rulename_to_filename

RULES_CONFIG = parse_rules_config()
Expand Down Expand Up @@ -250,6 +251,12 @@ def _process_imported_items(
'"alert.attributes.tags: \\"test\\"" to filter for rules that have the tag "test"'
),
)
@click.option(
"--load-rule-loading",
"-lr",
is_flag=True,
help="Enable arbitrary rule loading from the rules directories (Can be very slow!)",
)
@click.pass_context
def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915
ctx: click.Context,
Expand All @@ -268,6 +275,7 @@ def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915
local_updated_date: bool = False,
custom_rules_only: bool = False,
export_query: str | None = None,
load_rule_loading: bool = False,
) -> list[TOMLRule]:
"""Export custom rules from Kibana."""
kibana = ctx.obj["kibana"]
Expand All @@ -277,6 +285,10 @@ def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915
if rule_name and rule_id:
raise click.UsageError("Cannot use --rule-id and --rule-name together. Please choose one.")

rules = None
if load_rule_loading:
rules = RuleCollection.default()

with kibana:
# Look up rule IDs by name if --rule-name was provided
if rule_name:
Expand Down Expand Up @@ -358,10 +370,24 @@ def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915
tactic_name = first_tactic if not no_tactic_filename else None # type: ignore[reportUnknownMemberType]
rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=tactic_name) # type: ignore[reportUnknownMemberType]

local_contents = None
save_path = directory / f"{rule_name}"

# Get local rule data if load_rule_loading is enabled. If not enabled rules variable will be None.
local_rule: dict[str, Any] = params.get("rule", {})
input_rule_id: str | None = None

if local_rule:
input_rule_id = cast("definitions.UUIDString", local_rule.get("rule_id"))

if rules and input_rule_id and input_rule_id in rules.id_map:
save_path = rules.id_map[input_rule_id].path
local_contents = rules.id_map[input_rule_id].contents
params.update(
update_metadata_from_file(
save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date}
{"creation_date": local_creation_date, "updated_date": local_updated_date},
save_path,
local_contents,
)
)
contents = TOMLRuleContents.from_rule_resource(**params) # type: ignore[reportArgumentType]
Expand Down
22 changes: 20 additions & 2 deletions detection_rules/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ def generate_rules_index(
@click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule")
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
@click.option(
"--load-rule-loading",
"-lr",
is_flag=True,
help="Enable arbitrary rule loading from the rules directories (Can be very slow!)",
)
def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
input_file: tuple[Path, ...] | None,
required_only: bool,
Expand All @@ -171,6 +177,7 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
strip_none_values: bool,
local_creation_date: bool,
local_updated_date: bool,
load_rule_loading: bool,
) -> None:
"""Import rules from json, toml, or yaml files containing Kibana exported rule(s)."""
errors: list[str] = []
Expand All @@ -189,6 +196,10 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
if not file_contents:
click.echo("Must specify at least one file!")

rules = None
if load_rule_loading:
rules = RuleCollection.default()

exceptions_containers = {}
exceptions_items = {}

Expand All @@ -210,7 +221,14 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915
base_path = rulename_to_filename(base_path) if base_path else base_path
if base_path is None:
raise ValueError(f"Invalid rule file, please ensure the rule has a name field: {contents}")
rule_path = Path(os.path.join(str(save_directory) if save_directory else RULES_DIRS[0], base_path)) # noqa: PTH118

local_contents = None
rule_base_path = Path(save_directory or RULES_DIRS[0])
rule_path = rule_base_path / base_path
rule_id = contents.get("rule_id")
if rules and rule_id in rules.id_map:
rule_path = rules.id_map[rule_id].path
local_contents = rules.id_map[rule_id].contents

# handle both rule json formats loaded from kibana and toml
data_view_id = contents.get("data_view_id") or contents.get("rule", {}).get("data_view_id")
Expand All @@ -226,7 +244,7 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915

contents.update(
update_metadata_from_file(
Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date}
{"creation_date": local_creation_date, "updated_date": local_updated_date}, rule_path, local_contents
)
)

Expand Down
19 changes: 13 additions & 6 deletions detection_rules/rule_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,24 @@ def load_locks_from_tag(
return commit_hash, version, deprecated


def update_metadata_from_file(rule_path: Path, fields_to_update: dict[str, Any]) -> dict[str, Any]:
"""Update metadata fields for a rule with local contents."""
def update_metadata_from_file(
fields_to_update: dict[str, Any], rule_path: Path | None = None, local_contents: TOMLRuleContents | None = None
) -> dict[str, Any]:
"""Update metadata fields for a rule with local contents or provided contents."""

contents: dict[str, Any] = {}
if not rule_path.exists():
return contents
if not rule_path and not local_contents:
raise ValueError("Either 'rule_path' or 'local_contents' must be provided.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙇🏻‍♂️


rule_contents = RuleCollection().load_file(rule_path).contents
# If local_contents is provided, always prefer it vs loading from rule_path
rule_contents = None or local_contents
if rule_path and not rule_contents:
if not rule_path.exists():
return contents
rule_contents = RuleCollection().load_file(rule_path).contents

if not isinstance(rule_contents, TOMLRuleContents):
raise TypeError("TOML rule expected")
raise TypeError("TOMLRuleContents expected")

local_metadata = rule_contents.metadata.to_dict()
if local_metadata:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.3.10"
version = "1.3.11"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
Loading