Skip to content

Commit

Permalink
Merge branch 'parser_feature' of github.com:MISP/misp-stix into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisr3d committed Aug 7, 2024
2 parents 0796d2c + 0816983 commit d3ca8e9
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 23 deletions.
4 changes: 4 additions & 0 deletions misp_stix_converter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ def main():
'-cg', '--cluster_sharing_group', type=int, default=None,
help='Galaxy Clusters sharing group ID in case of External STIX 2 content.'
)
import_parser.add_argument(
'-t', '--title', type=str, default=None,
help='Title prefix to add to the MISP Event `info` field.'
)
import_parser.add_argument(
'-p', '--producer',
help=(
Expand Down
16 changes: 12 additions & 4 deletions misp_stix_converter/misp2stix/misp_to_stix21.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _parse_event_data(self):
list(object_refs) if object_refs
else self._handle_empty_note_refs()
)
self._append_SDO(Note(**note_args))
self._append_SDO(self._create_note(note_args))
self._handle_analyst_data(note_args['id'], event_report)
else:
self._id_parsing_function = {
Expand Down Expand Up @@ -199,7 +199,7 @@ def _handle_empty_object_refs(self, object_id: str, timestamp: datetime):
'created_by_ref': self.identity_id, 'object_refs': [object_id],
'content': 'This MISP Event is empty and contains no attribute, object, galaxy or tag.'
}
self._append_SDO(Note(**note_args))
self._append_SDO(self._create_note(note_args))

def _handle_markings(self, object_args: dict, markings: tuple):
marking_ids = []
Expand Down Expand Up @@ -236,7 +236,9 @@ def _handle_note_data(self, note, object_id: str):
}
if note.get('language'):
note_args['lang'] = note['language']
getattr(self, self._results_handling_function)(Note(**note_args))
getattr(self, self._results_handling_function)(
self._create_note(note_args)
)

def _handle_object_analyst_data(
self, misp_object: Union[MISPObject, dict], object_id: str):
Expand Down Expand Up @@ -799,7 +801,7 @@ def _parse_annotation_object(
values[0] if isinstance(values, list) and len(values) == 1
else values
)
self._append_SDO(Note(**note_args))
self._append_SDO(self._create_note(note_args))
self._handle_object_analyst_data(misp_object, note_id)

def _parse_asn_object_observable(
Expand Down Expand Up @@ -1750,6 +1752,12 @@ def _create_malware(malware_args: dict) -> Malware:
malware_args['is_family'] = False
return Malware(**malware_args)

@staticmethod
def _create_note(note_args: dict) -> Note:
if any(ref.startswith('x-misp-') for ref in note_args['object_refs']):
note_args['allow_custom'] = True
return Note(**note_args)

def _create_observed_data(self, args: dict, observables: list):
args['object_refs'] = [observable.id for observable in observables]
getattr(self, self._results_handling_function)(ObservedData(**args))
Expand Down
15 changes: 9 additions & 6 deletions misp_stix_converter/misp_stix_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ def stix_2_to_misp(filename: _files_type,
output_name: Optional[_files_type]=None,
producer: Optional[str] = None,
sharing_group_id: Optional[int] = None,
single_event: Optional[bool] = False) -> dict:
single_event: Optional[bool] = False,
title: Optional[str] = None) -> dict:
if isinstance(filename, str):
filename = Path(filename).resolve()
try:
Expand All @@ -686,7 +687,7 @@ def stix_2_to_misp(filename: _files_type,
return {'errors': [f'{filename} - {error.__str__()}']}
parser, args = _get_stix2_parser(
_from_misp(bundle.objects), distribution, sharing_group_id,
producer, galaxies_as_tags, organisation_uuid,
title, producer, galaxies_as_tags, organisation_uuid,
cluster_distribution, cluster_sharing_group_id
)
stix_parser = parser(*args)
Expand Down Expand Up @@ -720,7 +721,8 @@ def stix2_to_misp_instance(
organisation_uuid: Optional[str] = MISP_org_uuid,
producer: Optional[str] = None,
sharing_group_id: Optional[int] = None,
single_event: Optional[bool] = False) -> dict:
single_event: Optional[bool] = False,
title: Optional[str] = None) -> dict:
if isinstance(filename, str):
filename = Path(filename).resolve()
try:
Expand All @@ -729,7 +731,7 @@ def stix2_to_misp_instance(
return {'errors': [f'{filename} - {error.__str__()}']}
parser, args = _get_stix2_parser(
_from_misp(bundle.objects), distribution, sharing_group_id,
producer, galaxies_as_tags, organisation_uuid,
title, producer, galaxies_as_tags, organisation_uuid,
cluster_distribution, cluster_sharing_group_id
)
stix_parser = parser(*args)
Expand Down Expand Up @@ -1104,7 +1106,7 @@ def _process_stix_to_misp_files(args) -> dict:
galaxies_as_tags=args.galaxies_as_tags, output_dir=args.output_dir,
organisation_uuid=args.org_uuid, output_name=args.output_name,
producer=args.producer, sharing_group_id=args.sharing_group,
single_event=args.single_event
single_event=args.single_event, title=args.title
)
if traceback.pop('success', 0) == 1:
success.extend(traceback.pop('results'))
Expand Down Expand Up @@ -1141,7 +1143,8 @@ def _process_stix_to_misp_instance(misp: PyMISP, args) -> dict:
debug=args.debug, distribution=args.distribution,
galaxies_as_tags=args.galaxies_as_tags,
organisation_uuid=args.org_uuid, producer=args.producer,
sharing_group_id=args.sharing_group, single_event=args.single_event
sharing_group_id=args.sharing_group,
single_event=args.single_event, title=args.title
)
if traceback.pop('success', 0) == 1:
success.extend(traceback.pop('results'))
Expand Down
3 changes: 2 additions & 1 deletion misp_stix_converter/stix2misp/external_stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
class ExternalSTIX2toMISPParser(STIX2toMISPParser):
def __init__(self, distribution: Optional[int] = 0,
sharing_group_id: Optional[int] = None,
title: Optional[str] = None,
producer: Optional[str] = None,
galaxies_as_tags: Optional[bool] = False,
organisation_uuid: Optional[str] = MISP_org_uuid,
cluster_distribution: Optional[int] = 0,
cluster_sharing_group_id: Optional[int] = None):
super().__init__(
distribution, sharing_group_id, producer, galaxies_as_tags
distribution, sharing_group_id, title, producer, galaxies_as_tags
)
self._set_cluster_distribution(
self._sanitise_distribution(cluster_distribution),
Expand Down
8 changes: 7 additions & 1 deletion misp_stix_converter/stix2misp/importparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def _load_json_file(path):

class STIXtoMISPParser(metaclass=ABCMeta):
def __init__(self, distribution: int, sharing_group_id: Union[int, None],
producer: Union[str, None], galaxies_as_tags: bool):
title: Union[str, None], producer: Union[str, None],
galaxies_as_tags: bool):
self._identifier: str
self.__relationship_types: dict

Expand All @@ -83,6 +84,7 @@ def __init__(self, distribution: int, sharing_group_id: Union[int, None],
self.__sharing_group_id = self._sanitise_sharing_group_id(
sharing_group_id
)
self.__title = title
self.__producer = producer
self.__galaxies_as_tags = self._sanitise_galaxies_as_tags(
galaxies_as_tags
Expand Down Expand Up @@ -133,6 +135,10 @@ def _sanitise_sharing_group_id(
def distribution(self) -> int:
return self.__distribution

@property
def event_title(self) -> Union[str, None]:
return self.__title

@property
def errors(self) -> dict:
return self.__errors
Expand Down
3 changes: 2 additions & 1 deletion misp_stix_converter/stix2misp/internal_stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
class InternalSTIX2toMISPParser(STIX2toMISPParser):
def __init__(self, distribution: Optional[int] = 0,
sharing_group_id: Optional[int] = None,
title: Optional[str] = None,
producer: Optional[str] = None,
galaxies_as_tags: Optional[bool] = False):
super().__init__(
distribution, sharing_group_id, producer, galaxies_as_tags
distribution, sharing_group_id, title, producer, galaxies_as_tags
)
self._mapping = InternalSTIX2toMISPMapping
# parsers
Expand Down
28 changes: 18 additions & 10 deletions misp_stix_converter/stix2misp/stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,6 @@
_MARKING_DEFINITION_TYPING = Union[
MarkingDefinition_v20, MarkingDefinition_v21
]
_MISP_FEATURES_TYPING = Union[
MISPAttribute, MISPEvent, MISPObject
]
_OBSERVED_DATA_PARSER_TYPING = Union[
ExternalSTIX2ObservedDataConverter, InternalSTIX2ObservedDataConverter
]
Expand Down Expand Up @@ -206,9 +203,10 @@

class STIX2toMISPParser(STIXtoMISPParser, metaclass=ABCMeta):
def __init__(self, distribution: int, sharing_group_id: Union[int, None],
producer: Union[str, None], galaxies_as_tags: bool):
title: Union[str, None], producer: Union[str, None],
galaxies_as_tags: bool):
super().__init__(
distribution, sharing_group_id, producer, galaxies_as_tags
distribution, sharing_group_id, title, producer, galaxies_as_tags
)
self._creators: set = set()
self._mapping: Union[
Expand Down Expand Up @@ -319,10 +317,12 @@ def event_tags(self) -> list:

@property
def generic_info_field(self) -> str:
return (
f'STIX {self.stix_version} Bundle ({self._identifier}) '
'imported with the MISP-STIX import feature.'
)
message = f'STIX {self.stix_version} Bundle ({self._identifier})'
if self.event_title is not None:
message = f'{self.event_title} {message}'
if self.producer is not None:
message += f' produced by {self.producer}'
return f'{message} and converted with the MISP-STIX import feature.'

@property
def identity_parser(self) -> _IDENTITY_PARSER_TYPING:
Expand Down Expand Up @@ -1154,7 +1154,7 @@ def _create_misp_event(
misp_event = MISPEvent(force_timestamps=True)
self._sanitise_object_uuid(misp_event, stix_object.id)
event_args = {
'info': getattr(stix_object, 'name', self.generic_info_field),
'info': self._generate_info_field(stix_object),
'distribution': self.distribution,
'timestamp': self._timestamp_from_date(stix_object.modified)
}
Expand Down Expand Up @@ -1202,6 +1202,14 @@ def _handle_tags_from_stix_fields(self, stix_object: _SDO_TYPING):
def _extract_uuid(object_id: str) -> str:
return object_id.split('--')[-1]

def _generate_info_field(self, stix_object: _GROUPING_REPORT_TYPING) -> str:
if hasattr(stix_object, 'name'):
title = stix_object.name
if self.event_title is not None:
title = f'{self.event_title} {title}'
return title
return self.generic_info_field

def _handle_creator(self, reference: str) -> str:
if reference in getattr(self, '_identity', {}):
return self._identity[reference].name
Expand Down

0 comments on commit d3ca8e9

Please sign in to comment.