Skip to content

output_fields validation #361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,23 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
end="",
flush=True,
)
if contentType == SecurityContentType.data_sources:
# After resolving all data_sources, we need to complete mappings
# that can ONLY be done after all data_sources have been parsed.
# This is because datasources may reference each other
# and we cannot resolve those references until all data sources have been parsed.
for ds in self.output_dto.data_sources:
try:
ds.resolveDataSourceObject(self.output_dto)
except (ValidationError, ValueError) as e:
if ds.file_path is None:
validation_errors.append((relative_path, ValueError(f"File path for DataSource {ds.name} was None.")))
validation_errors.append((Path("PATH_NOT_FOUND"), e))
else:
relative_path = ds.file_path.absolute().relative_to(
self.input_dto.path.absolute()
)
validation_errors.append((relative_path, e))
print("Done!")

if len(validation_errors) > 0:
Expand Down
71 changes: 62 additions & 9 deletions contentctl/objects/data_source.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,71 @@
from __future__ import annotations
from typing import Optional, Any
from pydantic import Field, HttpUrl, model_serializer, BaseModel
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto

from enum import StrEnum, auto


from pydantic import BaseModel, Field, HttpUrl, model_serializer, ConfigDict, computed_field
from functools import cached_property

from contentctl.objects.security_content_object import SecurityContentObject


class TA(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str
url: HttpUrl | None = None
url: HttpUrl
version: str


class DataSourceDataModel(StrEnum):
ocsf = auto()
custom_cim = auto()
cim = auto()


class Field_Mapping(BaseModel):
model_config = ConfigDict(extra="forbid")
data_model: DataSourceDataModel
data_set: str | None = None
mapping: dict[str, str]


class LogConvert(BaseModel):
model_config = ConfigDict(extra="forbid")
# This should really be a DataSource object,
# but the order in which they are defined makes
# this challenging.
# We will need to keep both these fields around for now
data_source: str
_data_source_object: DataSource | None = None
mapping: dict[str, str]

@computed_field
@cached_property
def data_source_object(self)->DataSource:
if self._data_source_object is None:
raise ValueError(f"Error - LogConvert.data_source object {self.data_source} "
"has not been resolved. Please ensure that 'configure_data_source_object' has been called")
return self._data_source_object

def resolveDataSourceObject(self, director: DirectorOutputDto | None )->None:
self._data_source_object = DataSource.mapNamesToSecurityContentObjects([self.data_source], director)[0]


class DataSource(SecurityContentObject):
model_config = ConfigDict(extra="forbid")
source: str = Field(...)
sourcetype: str = Field(...)
separator: Optional[str] = None
configuration: Optional[str] = None
supported_TA: list[TA] = []
fields: None | list = None
field_mappings: None | list = None
convert_to_log_source: None | list = None
separator: None | str = None
configuration: None | str = None
supported_TA: list[TA]
fields: list[str] = []
field_mappings: list[Field_Mapping] = []
convert_to_log_source: list[LogConvert] = []
example_log: None | str = None
output_fields: list[str] = []

@model_serializer
def serialize_model(self):
Expand All @@ -44,3 +90,10 @@

# return the model
return super_fields

def resolveDataSourceObject(self, director: DirectorOutputDto | None )->None:
for index,log in enumerate(self.convert_to_log_source):
try:
log.resolveDataSourceObject(director)
except Exception as e:

Check failure on line 98 in contentctl/objects/data_source.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F841)

contentctl/objects/data_source.py:98:33: F841 Local variable `e` is assigned to but never used
raise ValueError(f"Error encountered when resolving field 'convert_to_log_source[{index}].data_source: {log.data_source}'. No DataSource by the name '{log.data_source}' exists")
Loading