Skip to content

TAXII Collector bot and STIX Parser bot #2611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions intelmq/bots/collectors/taxii/REQUIREMENTS.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: 2025 Ladislav Baco
# SPDX-License-Identifier: AGPL-3.0-or-later

taxii2-client>=2.3.0
Empty file.
64 changes: 64 additions & 0 deletions intelmq/bots/collectors/taxii/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
SPDX-FileCopyrightText: 2025 Ladislav Baco
SPDX-License-Identifier: AGPL-3.0-or-later

Get indicator objects from TAXII server

Configuration parameters: taxii collection (feed) url, username and password.
"""

import datetime
import json
from requests.exceptions import HTTPError

from intelmq.lib.bot import CollectorBot
from intelmq.lib.exceptions import MissingDependencyError

try:
import taxii2client.v21 as taxii2
except ImportError:
taxii2 = None


class TaxiiCollectorBot(CollectorBot):
"""Collect data from TAXII Server"""
collection: str = None
username: str = None
password: str = None
rate_limit: int = 3600
time_delta: int = 3600

def init(self):
if taxii2 is None:
raise MissingDependencyError('taxii2-client')

if self.collection is None:
raise ValueError('No TAXII collection URL provided.')
if self.username is None:
raise ValueError('No TAXII username provided.')
if self.password is None:
raise ValueError('No TAXII password provided.')

self._date_after = datetime.datetime.now() - datetime.timedelta(seconds=int(self.time_delta))

self._taxii_collection = taxii2.Collection(self.collection, user=self.username, password=self.password)

def process(self):
try:
title = self._taxii_collection.title
self.logger.info('Collection title: %r.', title)

# get the indicator objects
objects = self._taxii_collection.get_objects(added_after=self._date_after, type='indicator').get('objects', [])
for obj in objects:
report = self.new_report()
report.add('raw', json.dumps(obj))
report.add('feed.url', self.collection)
report.add('feed.code', title)
self.send_message(report)

except HTTPError as e:
self.logger.error('Connection error: %r!', e)


BOT = TaxiiCollectorBot
Empty file.
88 changes: 88 additions & 0 deletions intelmq/bots/parsers/stix/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
SPDX-FileCopyrightText: 2025 Ladislav Baco
SPDX-License-Identifier: AGPL-3.0-or-later

Parse indicators objects in STIX format received from TAXII collector
"""

import json

from intelmq.lib.bot import ParserBot


class StixParserBot(ParserBot):
"""Parse STIX indicators"""
parse = ParserBot.parse_json_stream
recover_line = ParserBot.recover_line_json_stream

def parse_line(self, line, report):
""" Parse one STIX object of indicator type """
object_type = line.get('type', '')
if object_type == 'indicator':
event = self.new_event(report)
event.add('raw', json.dumps(line))
event.add('comment', line.get('description', ''))
event.add('extra.labels', line.get('labels', None))
event.add('time.source', line.get('valid_from', '1970-01-01T00:00:00Z'))
# classification will be determined by expert bot specific for given TAXII collection
event.add('classification.type', 'undetermined')

pattern = line.get('pattern', '')
# stix, pcre, sigma, snort, suricata, yara
pattern_type = line.get('pattern_type', '')

if pattern_type == 'stix':
indicator = self.parse_stix_pattern(pattern)
if indicator:
event.add(indicator[0], indicator[1])
self.parse_vendor_specific(event, line, report)
yield event
else:
self.logger.warning('Unexpected type of pattern expression: %r, pattern: %r', pattern_type, pattern)
else:
self.logger.warning('Unexpected type of STIX object: %r', object_type)

def parse_vendor_specific(self, event, line, report):
"""
Parse vendor specific details from the STIX 2.1 Indicator object.
This method by default does nothing and it is called just before IntelMQ event is yielded.
If we need vendor-specific STIX parser, we can inherit from this class and override this one method.
"""
return

@staticmethod
def parse_stix_pattern(pattern):
"""
STIX Patterning:
https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html#_e8slinrhxcc9
"""
if pattern.count('[') != 1:
print('Unsupported Pattern Expression. Only single Observation Expression is supported. Pattern: {}'.format(pattern))
return

value = pattern.split("'")[1]
if pattern.startswith('[url:value = '):
return ('source.url', value)
if pattern.startswith('[domain-name:value = '):
return ('source.fqdn', value)
if pattern.startswith('[ipv4-addr:value = '):
# remove port, sometimes the port is present in ETI
value = value.split(':')[0]
# strip CIDR if IPv4 network contains single host only
value = value[:-3] if value.endswith('/32') else value
# check if pattern is in CIDR notation
if value.rfind('/') > -1:
return ('source.network', value)
else:
return ('source.ip', value)
if pattern.startswith('[ipv6-addr:value = '):
# strip CIDR if IPv6 network contains single host only
value = value[:-4] if value.endswith('/128') else value
# check if pattern is in CIDR notation
if value.rfind('/') > -1:
return ('source.network', value)
else:
return ('source.ip', value)


BOT = StixParserBot
111 changes: 111 additions & 0 deletions intelmq/bots/parsers/stix/parser_eset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# SPDX-FileCopyrightText: 2025 Ladislav Baco
#
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
"""
Parser bot for ESET Threat Intelligence feeds
This bot parses STIX Indicators objects received from TAXII collector
Then it analyzes event's comments based on STIX indicator's description
and it adds classification.type and malware family info
It is recommended to apply TaxonomyExpertBot then to map the taxonomy
"""

import re

from intelmq.bots.parsers.stix.parser import StixParserBot


CLASSIFICATION_BY_STRING = {
'Host actively distributes high-severity malicious content in the form of executable code.': 'malware-distribution',
'Host actively distributes high-severity threat in the form of executable code.': 'malware-distribution',
'Host actively distributes high-severity threat in the form of malicious code.': 'malware-distribution',
'Host actively distributes high-severity threat in the form of script code.': 'malware-distribution',
'Host is known to be actively distributing adware or other medium-risk software.': 'malware-distribution',
'Host is known to be actively distributing high-severity mobile threats or low-risk software.': 'other',
'Host is known to be actively distributing threats or is of uncertain reputation.': 'other',
'Host is known to be distributing low-risk and potentially unwanted content.': 'other',
'Host actively distributes potentially unwanted or unsafe threat.': 'other',
'Host is known source of phishing or other fraudulent content.': 'phishing',
'Host is known source of active fraudulent content.': 'other',
'Host is used as command and control server.': 'c2-server',
'Web services scanning and attacks': 'scanner',
'RDP bruteforce IP': 'brute-force',
'SQL bruteforce IP': 'brute-force',
'SMB bruteforce IP': 'brute-force',
'MySQL bruteforce IP': 'brute-force',
'FTP bruteforce IP': 'brute-force'
}

CLASSIFICATION_REGEX = {
'C&C indicates that a botnet ([^ ]+) ([^ ]+) is present.': 'c2-server',
'C&C of ([^ ]+) ([^ ]+)': 'c2-server',
'Host is used as command and control server of ([^ ]+) ([^ ]+) malware family.': 'c2-server',
'Each of these file hashes indicates that a variant of ([^ ]+) ([^ ]+) is present.': 'malware',
'^[.* ]?([^ ]+) C&C server.*$': 'c2-server',
'^[.* ]?([^ ]+) backdoor.*$': 'malware',
'^[.* ]?([^ ]+) trojan.*$': 'malware',
'^[.* ]?([^ ]+) implant.*$': 'malware',
'Loader for ([^ ]+).*$': 'malware'
}

CLASSIFICATION_BY_REGEX = {}
for (regex, classification_type) in CLASSIFICATION_REGEX.items():
CLASSIFICATION_BY_REGEX[re.compile(regex)] = classification_type


class ESETStixParserBot(StixParserBot):
"""Add classification.type and malware family to events"""

# Platform/Type.Family.Variant!Suffixes
# Type and suffixes are optional
_malware_naming_convention_pattern = re.compile(r'^([^/]*/)?([^\.]*\.)?([^\.]+)(\.[^!]*)(!.*)?$')

def parse_vendor_specific(self, event, line, report):
classification_type = event.get('classification.type', 'undetermined')
if classification_type == 'undetermined':
comment = event.get('comment', '')
classification_type, malware_name = self.classify(comment)
event.add('classification.type', classification_type, overwrite=True)
if malware_name:
event.add('malware.name', malware_name)
else:
# classification.type already present, do not change it
pass

@staticmethod
def classify(comment):
""" Classify comment and returns (classification_type, malware_name) """
classification_type = CLASSIFICATION_BY_STRING.get(comment, None)
if classification_type:
malware_name = None
return (classification_type, malware_name)

for (pattern, classification_type) in CLASSIFICATION_BY_REGEX.items():
match = pattern.match(comment)
if match:
malware_name = None
groups = match.groups()
if len(groups) > 0:
malware = groups[0]
malware_name = ESETStixParserBot.extract_malware_family(malware)
return (classification_type, malware_name)

return ('undetermined', None)

@staticmethod
def extract_malware_family(malware):
""" Extract malware family from the threat detection string """

match = ESETStixParserBot._malware_naming_convention_pattern.match(malware)
if match and len(match.groups()) == 5:
malware_name = match.groups()[2]
else:
# usually just malware family (or unknown naming convention)
malware_name = malware

# IntelMQ malware.name should be lowercase
return malware_name.lower()


BOT = ESETStixParserBot
Loading