Skip to content

Commit 15b6789

Browse files
committed
Add ESET STIX Parser bot
Parser bot for enriching events from ESET Threat Intelligence, which were collected by TaxiiCollectorBot. It inherits from generic StixParserBot and implement vendor-specific parsing. ESET STIX Parser bot analyzes comment (based on original description of STIX Indicator object) and choose proper classification type and if possible, also fills the malware.name in the event.
1 parent 69984ba commit 15b6789

File tree

4 files changed

+256
-12
lines changed

4 files changed

+256
-12
lines changed

intelmq/bots/parsers/stix/parser.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,21 @@ def parse_line(self, line, report):
3535
indicator = self.parse_stix_pattern(pattern)
3636
if indicator:
3737
event.add(indicator[0], indicator[1])
38+
self.parse_vendor_specific(event, line, report)
3839
yield event
3940
else:
4041
self.logger.warning('Unexpected type of pattern expression: %r, pattern: %r', pattern_type, pattern)
4142
else:
4243
self.logger.warning('Unexpected type of STIX object: %r', object_type)
4344

45+
def parse_vendor_specific(self, event, line, report):
46+
"""
47+
Parse vendor specific details from the STIX 2.1 Indicator object.
48+
This method by default does nothing and it is called just before IntelMQ event is yielded.
49+
If we need vendor-specific STIX parser, we can inherit from this class and override this one method.
50+
"""
51+
return
52+
4453
@staticmethod
4554
def parse_stix_pattern(pattern):
4655
"""
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# SPDX-FileCopyrightText: 2025 Ladislav Baco
2+
#
3+
# SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
# -*- coding: utf-8 -*-
6+
"""
7+
Parser bot for ESET Threat Intelligence feeds
8+
This bot parses STIX Indicators objects received from TAXII collector
9+
Then it analyzes event's comments based on STIX indicator's description
10+
and it adds classification.type and malware family info
11+
It is recommended to apply TaxonomyExpertBot then to map the taxonomy
12+
"""
13+
14+
import re
15+
16+
from intelmq.bots.parsers.stix.parser import StixParserBot
17+
18+
19+
CLASSIFICATION_BY_STRING = {
20+
'Host actively distributes high-severity malicious content in the form of executable code.': 'malware-distribution',
21+
'Host actively distributes high-severity threat in the form of executable code.': 'malware-distribution',
22+
'Host actively distributes high-severity threat in the form of malicious code.': 'malware-distribution',
23+
'Host actively distributes high-severity threat in the form of script code.': 'malware-distribution',
24+
'Host is known to be actively distributing adware or other medium-risk software.': 'malware-distribution',
25+
'Host is known to be actively distributing high-severity mobile threats or low-risk software.': 'other',
26+
'Host is known to be actively distributing threats or is of uncertain reputation.': 'other',
27+
'Host is known to be distributing low-risk and potentially unwanted content.': 'other',
28+
'Host actively distributes potentially unwanted or unsafe threat.': 'other',
29+
'Host is known source of phishing or other fraudulent content.': 'phishing',
30+
'Host is known source of active fraudulent content.': 'other',
31+
'Host is used as command and control server.': 'c2-server',
32+
'Web services scanning and attacks': 'scanner',
33+
'RDP bruteforce IP': 'brute-force',
34+
'SQL bruteforce IP': 'brute-force',
35+
'SMB bruteforce IP': 'brute-force',
36+
'MySQL bruteforce IP': 'brute-force',
37+
'FTP bruteforce IP': 'brute-force'
38+
}
39+
40+
CLASSIFICATION_REGEX = {
41+
'C&C indicates that a botnet ([^ ]+) ([^ ]+) is present.': 'c2-server',
42+
'C&C of ([^ ]+) ([^ ]+)': 'c2-server',
43+
'Host is used as command and control server of ([^ ]+) ([^ ]+) malware family.': 'c2-server',
44+
'Each of these file hashes indicates that a variant of ([^ ]+) ([^ ]+) is present.': 'malware',
45+
'^[.* ]?([^ ]+) C&C server.*$': 'c2-server',
46+
'^[.* ]?([^ ]+) backdoor.*$': 'malware',
47+
'^[.* ]?([^ ]+) trojan.*$': 'malware',
48+
'^[.* ]?([^ ]+) implant.*$': 'malware',
49+
'Loader for ([^ ]+).*$': 'malware'
50+
}
51+
52+
CLASSIFICATION_BY_REGEX = {}
53+
for (regex, classification_type) in CLASSIFICATION_REGEX.items():
54+
CLASSIFICATION_BY_REGEX[re.compile(regex)] = classification_type
55+
56+
57+
class ESETStixParserBot(StixParserBot):
58+
"""Add classification.type and malware family to events"""
59+
60+
# Platform/Type.Family.Variant!Suffixes
61+
# Type and suffixes are optional
62+
_malware_naming_convention_pattern = re.compile(r'^([^/]*/)?([^\.]*\.)?([^\.]+)(\.[^!]*)(!.*)?$')
63+
64+
def parse_vendor_specific(self, event, line, report):
65+
classification_type = event.get('classification.type', 'undetermined')
66+
if classification_type == 'undetermined':
67+
comment = event.get('comment', '')
68+
classification_type, malware_name = self.classify(comment)
69+
event.add('classification.type', classification_type, overwrite=True)
70+
if malware_name:
71+
event.add('malware.name', malware_name)
72+
else:
73+
# classification.type already present, do not change it
74+
pass
75+
76+
@staticmethod
77+
def classify(comment):
78+
""" Classify comment and returns (classification_type, malware_name) """
79+
classification_type = CLASSIFICATION_BY_STRING.get(comment, None)
80+
if classification_type:
81+
malware_name = None
82+
return (classification_type, malware_name)
83+
84+
for (pattern, classification_type) in CLASSIFICATION_BY_REGEX.items():
85+
match = pattern.match(comment)
86+
if match:
87+
malware_name = None
88+
groups = match.groups()
89+
if len(groups) > 0:
90+
malware = groups[0]
91+
malware_name = ESETStixParserBot.extract_malware_family(malware)
92+
return (classification_type, malware_name)
93+
94+
return ('undetermined', None)
95+
96+
@staticmethod
97+
def extract_malware_family(malware):
98+
""" Extract malware family from the threat detection string """
99+
100+
match = ESETStixParserBot._malware_naming_convention_pattern.match(malware)
101+
if match and len(match.groups()) == 5:
102+
malware_name = match.groups()[2]
103+
else:
104+
# usually just malware family (or unknown naming convention)
105+
malware_name = malware
106+
107+
# IntelMQ malware.name should be lowercase
108+
return malware_name.lower()
109+
110+
111+
BOT = ESETStixParserBot

intelmq/tests/bots/parsers/stix/test_parser_bot.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,17 @@
2727
}
2828

2929
EXAMPLE_EVENT = {'__type': 'Event',
30-
'feed.name': 'Taxii Feed',
31-
'feed.code': 'feed stix2.1',
32-
'feed.provider': 'Taxii Provider',
33-
'feed.documentation': 'Taxii Documentation',
34-
'feed.accuracy': 100.0,
35-
'feed.url': 'http://localhost/feed',
36-
'source.url': 'http://example.org',
37-
'time.source': '1970-01-01T00:00:00+00:00',
38-
'classification.type': 'undetermined',
39-
'raw': 'eyJpZCI6ICJpbmRpY2F0b3ItLTAiLCAidHlwZSI6ICJpbmRpY2F0b3IiLCAic3BlY192ZXJzaW9uIjogIjIuMSIsICJjcmVhdGVkIjogIjE5NzAtMDEtMDFUMDA6MDA6MDAuMDAwWiIsICJtb2RpZmllZCI6ICIxOTcwLTAxLTAxVDAwOjAwOjAwLjAwMFoiLCAicGF0dGVybiI6ICJbdXJsOnZhbHVlID0gJ2h0dHA6Ly9leGFtcGxlLm9yZyddIiwgInBhdHRlcm5fdHlwZSI6ICJzdGl4IiwgInZhbGlkX2Zyb20iOiAiMTk3MC0wMS0wMVQwMDowMDowMFoifQ=='
40-
}
30+
'feed.name': 'Taxii Feed',
31+
'feed.code': 'feed stix2.1',
32+
'feed.provider': 'Taxii Provider',
33+
'feed.documentation': 'Taxii Documentation',
34+
'feed.accuracy': 100.0,
35+
'feed.url': 'http://localhost/feed',
36+
'source.url': 'http://example.org',
37+
'time.source': '1970-01-01T00:00:00+00:00',
38+
'classification.type': 'undetermined',
39+
'raw': 'eyJpZCI6ICJpbmRpY2F0b3ItLTAiLCAidHlwZSI6ICJpbmRpY2F0b3IiLCAic3BlY192ZXJzaW9uIjogIjIuMSIsICJjcmVhdGVkIjogIjE5NzAtMDEtMDFUMDA6MDA6MDAuMDAwWiIsICJtb2RpZmllZCI6ICIxOTcwLTAxLTAxVDAwOjAwOjAwLjAwMFoiLCAicGF0dGVybiI6ICJbdXJsOnZhbHVlID0gJ2h0dHA6Ly9leGFtcGxlLm9yZyddIiwgInBhdHRlcm5fdHlwZSI6ICJzdGl4IiwgInZhbGlkX2Zyb20iOiAiMTk3MC0wMS0wMVQwMDowMDowMFoifQ=='
40+
}
4141

4242

4343
class TestStixParserBot(test.BotTestCase, unittest.TestCase):
@@ -56,7 +56,6 @@ def test_event(self):
5656
self.run_bot()
5757
self.assertMessageEqual(0, EXAMPLE_EVENT)
5858

59-
6059
def test_pattern_url(self):
6160
""" Test if url pattern is parsed. """
6261
indicator = self.bot_reference.parse_stix_pattern("[url:value = 'http://example.org']")
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# SPDX-FileCopyrightText: 2025 Ladislav Baco
2+
#
3+
# SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
# -*- coding: utf-8 -*-
6+
"""
7+
Test with example reports (STIX objects usually collected from TAXII server)
8+
"""
9+
import unittest
10+
11+
import re
12+
import requests_mock
13+
14+
import intelmq.lib.bot as bot
15+
import intelmq.lib.test as test
16+
from intelmq.bots.parsers.stix.parser_eset import ESETStixParserBot
17+
18+
19+
EXAMPLE_REPORT = {'__type': 'Report',
20+
'feed.name': 'Botnet feed',
21+
'feed.code': 'botnet stix 2.1',
22+
'feed.provider': 'ESET',
23+
'feed.documentation': 'https://help.eset.com/eti_portal/en-US/botnet-feed.',
24+
'feed.accuracy': 100.0,
25+
'feed.url': 'https://taxii.eset.com/taxii2/643f4eb5-f8b7-46a3-a606-6d61d5ce223a/collections/0abb06690b0b47e49cd7794396b76b20/',
26+
'raw': 'eyJpZCI6ICJpbmRpY2F0b3ItLTAiLCAidHlwZSI6ICJpbmRpY2F0b3IiLCAic3BlY192ZXJzaW9uIjogIjIuMSIsICJjcmVhdGVkIjogIjE5NzAtMDEtMDFUMDA6MDA6MDAuMDAwWiIsICJtb2RpZmllZCI6ICIxOTcwLTAxLTAxVDAwOjAwOjAwLjAwMFoiLCAicGF0dGVybiI6ICJbdXJsOnZhbHVlID0gJ2h0dHA6Ly9leGFtcGxlLm9yZyddIiwgInBhdHRlcm5fdHlwZSI6ICJzdGl4IiwgInZhbGlkX2Zyb20iOiAiMTk3MC0wMS0wMVQwMDowMDowMFoiLCAiZGVzY3JpcHRpb24iOiAiQyZDIGluZGljYXRlcyB0aGF0IGEgYm90bmV0IFdpbjMyL1NweS5MdW1tYVN0ZWFsZXIuQiB0cm9qYW4gaXMgcHJlc2VudC4iLCAibGFiZWxzIjogWyJtYWxpY2lvdXMtYWN0aXZpdHkiXX0='
27+
}
28+
29+
EXAMPLE_EVENT = {'__type': 'Event',
30+
'feed.name': 'Botnet feed',
31+
'feed.code': 'botnet stix 2.1',
32+
'feed.provider': 'ESET',
33+
'feed.documentation': 'https://help.eset.com/eti_portal/en-US/botnet-feed.',
34+
'feed.accuracy': 100.0,
35+
'feed.url': 'https://taxii.eset.com/taxii2/643f4eb5-f8b7-46a3-a606-6d61d5ce223a/collections/0abb06690b0b47e49cd7794396b76b20/',
36+
'source.url': 'http://example.org',
37+
'time.source': '1970-01-01T00:00:00+00:00',
38+
'classification.type': 'c2-server',
39+
'malware.name': 'lummastealer',
40+
'comment': 'C&C indicates that a botnet Win32/Spy.LummaStealer.B trojan is present.',
41+
'extra.labels': ['malicious-activity'],
42+
'raw': 'eyJpZCI6ICJpbmRpY2F0b3ItLTAiLCAidHlwZSI6ICJpbmRpY2F0b3IiLCAic3BlY192ZXJzaW9uIjogIjIuMSIsICJjcmVhdGVkIjogIjE5NzAtMDEtMDFUMDA6MDA6MDAuMDAwWiIsICJtb2RpZmllZCI6ICIxOTcwLTAxLTAxVDAwOjAwOjAwLjAwMFoiLCAicGF0dGVybiI6ICJbdXJsOnZhbHVlID0gJ2h0dHA6Ly9leGFtcGxlLm9yZyddIiwgInBhdHRlcm5fdHlwZSI6ICJzdGl4IiwgInZhbGlkX2Zyb20iOiAiMTk3MC0wMS0wMVQwMDowMDowMFoiLCAiZGVzY3JpcHRpb24iOiAiQyZDIGluZGljYXRlcyB0aGF0IGEgYm90bmV0IFdpbjMyL1NweS5MdW1tYVN0ZWFsZXIuQiB0cm9qYW4gaXMgcHJlc2VudC4iLCAibGFiZWxzIjogWyJtYWxpY2lvdXMtYWN0aXZpdHkiXX0='
43+
}
44+
45+
46+
class TestESETStixParserBot(test.BotTestCase, unittest.TestCase):
47+
"""
48+
A TestCase for an ESETStixParserBot.
49+
"""
50+
51+
@classmethod
52+
def set_bot(cls):
53+
cls.bot_reference = ESETStixParserBot
54+
cls.sysconfig = {}
55+
56+
def test_event(self):
57+
""" Test if correct Event has been produced. """
58+
self.input_message = EXAMPLE_REPORT
59+
self.run_bot()
60+
self.assertMessageEqual(0, EXAMPLE_EVENT)
61+
62+
def test_classification_by_string(self):
63+
""" Test if correct classification based on string is returned. """
64+
classification_type, malware_name = self.bot_reference.classify('Host actively distributes high-severity malicious content in the form of executable code.')
65+
self.assertEqual(str(classification_type), 'malware-distribution')
66+
self.assertEqual(malware_name, None)
67+
68+
classification_type, malware_name = self.bot_reference.classify('Host is known source of phishing or other fraudulent content.')
69+
self.assertEqual(str(classification_type), 'phishing')
70+
self.assertEqual(malware_name, None)
71+
72+
classification_type, malware_name = self.bot_reference.classify('Host is used as command and control server.')
73+
self.assertEqual(str(classification_type), 'c2-server')
74+
self.assertEqual(malware_name, None)
75+
76+
classification_type, malware_name = self.bot_reference.classify('Web services scanning and attacks')
77+
self.assertEqual(str(classification_type), 'scanner')
78+
self.assertEqual(malware_name, None)
79+
80+
classification_type, malware_name = self.bot_reference.classify('RDP bruteforce IP')
81+
self.assertEqual(str(classification_type), 'brute-force')
82+
self.assertEqual(malware_name, None)
83+
84+
def test_classification_by_regex(self):
85+
""" Test if correct classification based on regex is returned. """
86+
classification_type, malware_name = self.bot_reference.classify('C&C indicates that a botnet Win32/Spy.LummaStealer.B trojan is present.')
87+
self.assertEqual(str(classification_type), 'c2-server')
88+
self.assertEqual(str(malware_name), 'lummastealer')
89+
90+
classification_type, malware_name = self.bot_reference.classify('C&C of Win32/Spy.LummaStealer.B trojan')
91+
self.assertEqual(str(classification_type), 'c2-server')
92+
self.assertEqual(str(malware_name), 'lummastealer')
93+
94+
classification_type, malware_name = self.bot_reference.classify('Host is used as command and control server of Win32/Emotet.BN trojan malware family.')
95+
self.assertEqual(str(classification_type), 'c2-server')
96+
self.assertEqual(str(malware_name), 'emotet')
97+
98+
classification_type, malware_name = self.bot_reference.classify('WizardNet backdoor.')
99+
self.assertEqual(str(classification_type), 'malware')
100+
self.assertEqual(str(malware_name), 'wizardnet')
101+
102+
classification_type, malware_name = self.bot_reference.classify('Loader for Emotet')
103+
self.assertEqual(str(classification_type), 'malware')
104+
self.assertEqual(str(malware_name), 'emotet')
105+
106+
def test_unknown_classification(self):
107+
""" Test if undetermined classification is returned when comment contains something unexpected. """
108+
classification_type, malware_name = self.bot_reference.classify('Example of unexpected comment.')
109+
self.assertEqual(str(classification_type), 'undetermined')
110+
self.assertEqual(malware_name, None)
111+
112+
def test_malware_family_name_extraction(self):
113+
""" Test if correct malwae family name is extracted from the given malware string. """
114+
malware_name = self.bot_reference.extract_malware_family('Win32/Spy.LummaStealer.B')
115+
self.assertEqual(str(malware_name), 'lummastealer')
116+
117+
malware_name = self.bot_reference.extract_malware_family('Win32/Rescoms.B')
118+
self.assertEqual(str(malware_name), 'rescoms')
119+
120+
malware_name = self.bot_reference.extract_malware_family('Emotet')
121+
self.assertEqual(str(malware_name), 'emotet')
122+
123+
124+
if __name__ == '__main__': # pragma: no cover
125+
unittest.main()

0 commit comments

Comments
 (0)