From 0e0b94920958e66b9372ccbed28533ebca147ae3 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Thu, 4 Jul 2024 16:32:30 +0200 Subject: [PATCH] ENG: Add support for creating separated MISP Events With `event_separator` parameter, user can decide to create more than one MISP event in the output bot and group incomming messages based on given field. In additon, the message library was fixed not to modify the parameter directly. --- CHANGELOG.md | 5 +- docs/user/bots.md | 10 +- intelmq/bots/outputs/misp/output_feed.py | 148 +++++++++++------- intelmq/lib/message.py | 13 +- .../bots/outputs/misp/test_output_feed.py | 89 +++++++++-- 5 files changed, 191 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fae4dd41c..1d9a416a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,8 +36,9 @@ #### Outputs - `intelmq.bots.outputs.misp.output_feed`: - Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski). - - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2505 by Kamil Mankowski). - - Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR by Kamil Mankowski). + - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2509 by Kamil Mankowski). + - Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR#2509 by Kamil Mankowski). + - Add `event_separator` parameter to allow keeping IntelMQ events in separated MISP Events based on a given field (PR#2509 by Kamil Mankowski). - `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar). ### Documentation diff --git a/docs/user/bots.md b/docs/user/bots.md index 3d2d64e4d..ce5c9eb4d 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4605,9 +4605,9 @@ a new MISP event based on `interval_event` triggers saving regardless of the cac **`attribute_mapping`** (optional, dict) If set, allows selecting which IntelMQ event fields are mapped to MISP attributes -as well as attribute parameters (like e.g. a comment). The expected format is a *dictonary of dictionaries*: +as well as attribute parameters (like e.g. a comment). The expected format is a *dictionary of dictionaries*: first-level key represents an IntelMQ field that will be directly translated to a MISP attribute; nested -dictionary represents addditional parameters PyMISP can take when creating an attribute. They can use +dictionary represents additional parameters PyMISP can take when creating an attribute. They can use names of other IntelMQ fields (then the value of such field will be used), or static values. If not needed, leave empty dict. @@ -4627,6 +4627,12 @@ and set their values as in the IntelMQ event. In addition, the `feed.name` would as given in the `event_description.text` from IntelMQ event, and `destination.ip` would be set as not usable for IDS. +**`event_separator` + +(optional, string): If set to a field name from IntelMQ event, the bot will group incoming messages +in separated MISP events, based on the value of this field. The `interval_event` parameter acts +for all grouping events together. + **Usage in MISP** Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server. diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index 878858cea..a0ef88239 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -9,22 +9,22 @@ from pathlib import Path from uuid import uuid4 -import pymisp - from intelmq.lib.bot import OutputBot from intelmq.lib.exceptions import MissingDependencyError -from ....lib.message import Message, MessageFactory +from ....lib.message import MessageFactory from intelmq.lib.mixins import CacheMixin from intelmq.lib.utils import parse_relative try: - from pymisp import MISPEvent, MISPOrganisation, NewAttributeError + from pymisp import MISPEvent, MISPObject, MISPOrganisation, NewAttributeError from pymisp.tools import feed_meta_generator except ImportError: # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 MISPEvent = None import_fail_reason = "import" +DEFAULT_KEY = "default" + class MISPFeedOutputBot(OutputBot, CacheMixin): """Generate an output in the MISP Feed format""" @@ -38,6 +38,7 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): ) _is_multithreadable: bool = False attribute_mapping: dict = None + event_separator: str = None @staticmethod def check_output_dir(dirname): @@ -50,7 +51,8 @@ def init(self): if MISPEvent is None: raise MissingDependencyError("pymisp", version=">=2.4.117.3") - self.current_event = None + self.current_events = {} + self.current_files = {} self.misp_org = MISPOrganisation() self.misp_org.name = self.misp_org_name @@ -66,58 +68,57 @@ def init(self): minutes=parse_relative(self.interval_event) ) + self.min_time_current = datetime.datetime.max + self.max_time_current = datetime.datetime.min + if (self.output_dir / ".current").exists(): try: with (self.output_dir / ".current").open() as f: - self.current_file = Path(f.read()) - - if self.current_file.exists(): - self.current_event = MISPEvent() - self.current_event.load_file(self.current_file) - - last_min_time, last_max_time = re.findall( - "IntelMQ event (.*) - (.*)", self.current_event.info - )[0] - last_min_time = datetime.datetime.strptime( - last_min_time, "%Y-%m-%dT%H:%M:%S.%f" - ) - last_max_time = datetime.datetime.strptime( - last_max_time, "%Y-%m-%dT%H:%M:%S.%f" - ) - if last_max_time < datetime.datetime.now(): - self.min_time_current = datetime.datetime.now() - self.max_time_current = self.min_time_current + self.timedelta - self.current_event = None - else: - self.min_time_current = last_min_time - self.max_time_current = last_max_time - except: + current = f.read() + + if not self.event_separator: + self.current_files[DEFAULT_KEY] = Path(current) + else: + self.current_files = { + k: Path(v) for k, v in json.loads(current).items() + } + + for key, path in self.current_files.items(): + self._load_event(path, key) + except Exception: self.logger.exception( - "Loading current event %s failed. Skipping it.", self.current_event + "Loading current events %s failed. Skipping it.", self.current_files ) - self.current_event = None - else: + self.current_events = {} + + if not self.current_files or self.max_time_current < datetime.datetime.now(): self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta + self.current_events = {} + + def _load_event(self, file_path: Path, key: str): + if file_path.exists(): + self.current_events[key] = MISPEvent() + self.current_events[key].load_file(file_path) + + last_min_time, last_max_time = re.findall( + "IntelMQ event (.*) - (.*)", self.current_events[key].info + )[0] + last_min_time = datetime.datetime.strptime( + last_min_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + last_max_time = datetime.datetime.strptime( + last_max_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + + self.min_time_current = min(last_min_time, self.min_time_current) + self.max_time_current = max(last_max_time, self.max_time_current) def process(self): - if not self.current_event or datetime.datetime.now() > self.max_time_current: + if datetime.datetime.now() > self.max_time_current: self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta - self.current_event = MISPEvent() - self.current_event.info = "IntelMQ event {begin} - {end}" "".format( - begin=self.min_time_current.isoformat(), - end=self.max_time_current.isoformat(), - ) - self.current_event.set_date(datetime.date.today()) - self.current_event.Orgc = self.misp_org - self.current_event.uuid = str(uuid4()) - self.current_file = self.output_dir / f"{self.current_event.uuid}.json" - with (self.output_dir / ".current").open("w") as f: - f.write(str(self.current_file)) - - # On startup or when timeout occurs, clean the queue to ensure we do not - # keep events forever because there was not enough generated + self._generate_feed() event = self.receive_message().to_dict(jsondict_as_string=True) @@ -128,19 +129,57 @@ def process(self): if cache_size is None: self._generate_feed(event) + elif not self.current_events: + # Always create the first event so we can keep track of the interval. + # It also ensures cleaning the queue after startup in case of awaiting + # messages from the previous run + self._generate_feed() elif cache_size >= self.bulk_save_count: self._generate_feed() self.acknowledge_message() + def _generate_new_event(self, key): + self.current_events[key] = MISPEvent() + self.current_events[key].info = "IntelMQ event {begin} - {end}" "".format( + begin=self.min_time_current.isoformat(), + end=self.max_time_current.isoformat(), + ) + self.current_events[key].set_date(datetime.date.today()) + self.current_events[key].Orgc = self.misp_org + self.current_events[key].uuid = str(uuid4()) + self.current_files[key] = ( + self.output_dir / f"{self.current_events[key].uuid}.json" + ) + with (self.output_dir / ".current").open("w") as f: + if not self.event_separator: + f.write(str(self.current_files[key])) + else: + json.dump({k: str(v) for k, v in self.current_files.items()}, f) + return self.current_events[key] + def _add_message_to_feed(self, message: dict): - obj = self.current_event.add_object(name="intelmq_event") + if not self.event_separator: + key = DEFAULT_KEY + else: + # For proper handling of nested fields + message_obj = MessageFactory.from_dict( + message, harmonization=self.harmonization, default_type="Event" + ) + key = message_obj.get(self.event_separator) or DEFAULT_KEY + + if key in self.current_events: + event = self.current_events[key] + else: + event = self._generate_new_event(key) + + obj = event.add_object(name="intelmq_event") if not self.attribute_mapping: self._default_mapping(obj, message) else: self._custom_mapping(obj, message) - def _default_mapping(self, obj: pymisp.MISPObject, message: dict): + def _default_mapping(self, obj: "MISPObject", message: dict): for object_relation, value in message.items(): try: obj.add_attribute(object_relation, value=value) @@ -162,15 +201,15 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic for parameter, value in definition.items(): # Check if the value is a harmonization key or a static value if isinstance(value, str) and ( - value in self.harmonization["event"] - or value.split(".", 1)[0] in self.harmonization["event"] + value in self.harmonization["event"] or + value.split(".", 1)[0] in self.harmonization["event"] ): result[parameter] = message.get(value) else: result[parameter] = value return result - def _custom_mapping(self, obj: pymisp.MISPObject, message: dict): + def _custom_mapping(self, obj: "MISPObject", message: dict): for object_relation, definition in self.attribute_mapping.items(): obj.add_attribute( object_relation, @@ -188,9 +227,10 @@ def _generate_feed(self, message: dict = None): self._add_message_to_feed(message) message = self.cache_pop() - feed_output = self.current_event.to_feed(with_meta=False) - with self.current_file.open("w") as f: - json.dump(feed_output, f) + for key, event in self.current_events.items(): + feed_output = event.to_feed(with_meta=False) + with self.current_files[key].open("w") as f: + json.dump(feed_output, f) feed_meta_generator(self.output_dir) diff --git a/intelmq/lib/message.py b/intelmq/lib/message.py index e99e22731..84ee60a52 100644 --- a/intelmq/lib/message.py +++ b/intelmq/lib/message.py @@ -48,17 +48,18 @@ def from_dict(message: dict, harmonization=None, MessageFactory.unserialize MessageFactory.serialize """ - if default_type and "__type" not in message: - message["__type"] = default_type + # don't modify the parameter + message_copy = message.copy() + + if default_type and "__type" not in message_copy: + message_copy["__type"] = default_type try: - class_reference = getattr(intelmq.lib.message, message["__type"]) + class_reference = getattr(intelmq.lib.message, message_copy["__type"]) except AttributeError: raise exceptions.InvalidArgument('__type', - got=message["__type"], + got=message_copy["__type"], expected=VALID_MESSSAGE_TYPES, docs=HARMONIZATION_CONF_FILE) - # don't modify the parameter - message_copy = message.copy() del message_copy["__type"] return class_reference(message_copy, auto=True, harmonization=harmonization) diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index abb4b9c36..31172a81b 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -70,18 +70,19 @@ def test_accumulating_events(self): current_event = open(f"{self.directory.name}/.current").read() - # First, the feed is empty - not enough events came + # The first event is always immediately dumped to the MISP feed + # But the second wait until bulk saving size is achieved with open(current_event) as f: objects = json.load(f).get("Event", {}).get("Object", []) - assert len(objects) == 0 + assert len(objects) == 1 - self.input_message = [EXAMPLE_EVENT] - self.run_bot(parameters={"bulk_save_count": 3}) + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=2, parameters={"bulk_save_count": 3}) # When enough events were collected, save them with open(current_event) as f: objects = json.load(f)["Event"]["Object"] - assert len(objects) == 3 + assert len(objects) == 4 self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT] self.run_bot(iterations=3, parameters={"bulk_save_count": 3}) @@ -89,17 +90,19 @@ def test_accumulating_events(self): # We continue saving to the same file until interval timeout with open(current_event) as f: objects = json.load(f)["Event"]["Object"] - assert len(objects) == 6 + assert len(objects) == 7 # Simulating leftovers in the queue when it's time to generate new event Path(f"{self.directory.name}/.current").unlink() - self.bot.cache_put(MessageFactory.from_dict(EXAMPLE_EVENT).to_dict(jsondict_as_string=True)) + self.bot.cache_put( + MessageFactory.from_dict(EXAMPLE_EVENT).to_dict(jsondict_as_string=True) + ) self.run_bot(parameters={"bulk_save_count": 3}) new_event = open(f"{self.directory.name}/.current").read() with open(new_event) as f: objects = json.load(f)["Event"]["Object"] - assert len(objects) == 1 + assert len(objects) == 2 def test_attribute_mapping(self): self.run_bot( @@ -108,7 +111,7 @@ def test_attribute_mapping(self): "source.ip": {}, "feed.name": {"comment": "event_description.text"}, "destination.ip": {"to_ids": False}, - "malware.name": {"comment": "extra.non_ascii"} + "malware.name": {"comment": "extra.non_ascii"}, } } ) @@ -133,7 +136,9 @@ def test_attribute_mapping(self): assert feed_name["comment"] == EXAMPLE_EVENT["event_description.text"] destination_ip = next( - attr for attr in attributes if attr.get("object_relation") == "destination.ip" + attr + for attr in attributes + if attr.get("object_relation") == "destination.ip" ) assert destination_ip["value"] == EXAMPLE_EVENT["destination.ip"] assert destination_ip["to_ids"] is False @@ -144,6 +149,70 @@ def test_attribute_mapping(self): assert malware_name["value"] == EXAMPLE_EVENT["malware.name"] assert malware_name["comment"] == EXAMPLE_EVENT["extra.non_ascii"] + def test_event_separation(self): + self.input_message = [ + EXAMPLE_EVENT, + {**EXAMPLE_EVENT, "malware.name": "another_malware"}, + EXAMPLE_EVENT, + ] + self.run_bot(iterations=3, parameters={"event_separator": "malware.name"}) + + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 2 + + with open(current_events["salityp2p"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 2 + malware_name = next( + attr["value"] + for attr in objects[0]["Attribute"] + if attr.get("object_relation") == "malware.name" + ) + assert malware_name == "salityp2p" + + with open(current_events["another_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + malware_name = next( + attr["value"] + for attr in objects[0]["Attribute"] + if attr.get("object_relation") == "malware.name" + ) + assert malware_name == "another_malware" + + def test_event_separation_with_extra_and_bulk_save(self): + self.input_message = [ + {**EXAMPLE_EVENT, "extra.some_key": "another_malware"}, + {**EXAMPLE_EVENT, "extra.some_key": "first_malware"}, + {**EXAMPLE_EVENT, "extra.some_key": "another_malware"}, + ] + self.run_bot( + iterations=3, + parameters={"event_separator": "extra.some_key", "bulk_save_count": 3}, + ) + + # Only the initial event is saved, the rest is cached + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 1 + with open(current_events["another_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + + self.input_message = {**EXAMPLE_EVENT, "extra.some_key": "first_malware"} + self.run_bot( + parameters={"event_separator": "extra.some_key", "bulk_save_count": 3}, + ) + + # Now everything is saved + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 2 + with open(current_events["another_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 2 + + with open(current_events["first_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 2 def tearDown(self): self.cache.delete(self.bot_id)