Skip to content

Commit

Permalink
Added parser stop after file termination
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Zoppi <[email protected]>
  • Loading branch information
TexZK committed Feb 9, 2024
1 parent 154ac4e commit 07c6f4f
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 54 deletions.
56 changes: 53 additions & 3 deletions src/hexrec/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ class BaseTag:
the *serialized* representation of a record.
"""

_DATA = ...
_DATA: Optional['BaseTag'] = None
r"""Alias to a common data record tag.
This tag is used internally to build a generic data record.
Expand Down Expand Up @@ -503,6 +503,39 @@ def is_data(self) -> bool:
"""
...

# noinspection PyMethodMayBeStatic
def is_file_termination(self) -> bool:
r"""Tells whether this is record tag terminates a record file.
This method returns true if this record is used to terminate a record
file.
This is usually the case for *End Of File* or *start address* records,
depending on the specific file *format*, if supported.
Returns:
bool: This is a file termination tag.
Examples:
>>> from hexrec import IhexFile
>>> record = IhexFile.Record.create_data(123, b'abc')
>>> record.tag.is_file_termination()
False
>>> record = IhexFile.Record.create_end_of_file()
>>> record.tag.is_file_termination()
True
>>> from hexrec import SrecFile
>>> record = SrecFile.Record.create_data(123, b'abc')
>>> record.tag.is_file_termination()
False
>>> record = SrecFile.Record.create_start()
>>> record.tag.is_file_termination()
True
"""

return False


if not __TYPING_HAS_SELF: # pragma: no cover
del Self
Expand Down Expand Up @@ -2989,7 +3022,12 @@ def merge(self, *files: 'BaseFile', clear: bool = False) -> Self:
return self

@classmethod
def parse(cls, stream: IO, ignore_errors: bool = False) -> Self:
def parse(
cls,
stream: IO,
ignore_errors: bool = False,
ignore_after_termination: bool = True,
) -> Self:
r"""Parses records from a byte stream.
It executes :meth:`BaseRecord.parse` for each line of the incoming
Expand All @@ -3009,6 +3047,11 @@ def parse(cls, stream: IO, ignore_errors: bool = False) -> Self:
ignore_errors (bool):
Ignore :class:`Exception` raised by :meth:`BaseRecord.parse`.
ignore_after_termination (bool):
Ignore anything after the termination record was parsed, if
supported (e.g. *End Of File* or *start address* record,
depending on the specific file *format*).
Returns:
:class:`BaseFile`: *self*.
Expand All @@ -3034,23 +3077,30 @@ def parse(cls, stream: IO, ignore_errors: bool = False) -> Self:
{'linear': True, 'maxdatalen': 3, 'startaddr': 51966}
"""

records = []
Record = cls.Record
records = []
row = 0

for line in stream:
row += 1

if cls._is_line_empty(line):
continue

try:
record = Record.parse(line)
except Exception:
if ignore_errors:
continue
raise

record.coords = (row, 0)
records.append(record)

if ignore_after_termination:
if record.tag.is_file_termination():
break

file = cls.from_records(records)
return file

Expand Down
16 changes: 4 additions & 12 deletions src/hexrec/formats/ihex.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import enum
import re
from typing import IO
from typing import Any
from typing import Mapping
from typing import Optional
Expand Down Expand Up @@ -150,6 +149,10 @@ def is_start(self) -> bool:
return ((self == self.START_SEGMENT_ADDRESS) or
(self == self.START_LINEAR_ADDRESS))

def is_file_termination(self) -> bool:

return self.is_eof()


if not __TYPING_HAS_SELF: # pragma: no cover
del Self
Expand Down Expand Up @@ -559,17 +562,6 @@ def linear(self, linear: bool) -> None:
self.discard_records()
self._linear = linear

@classmethod
def parse(
cls,
stream: IO,
ignore_errors: bool = False,
# TODO: ignore_after_termination: bool = True,
) -> Self:

file = super().parse(stream, ignore_errors=ignore_errors)
return _cast(IhexFile, file)

@property
def startaddr(self) -> Optional[int]:
r"""Start address.
Expand Down
12 changes: 10 additions & 2 deletions src/hexrec/formats/mos.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def is_eof(self) -> bool:

return self == self.EOF

def is_file_termination(self) -> bool:

return self.is_eof()


if not __TYPING_HAS_SELF: # pragma: no cover
del Self
Expand Down Expand Up @@ -333,7 +337,7 @@ def parse(
cls,
stream: IO,
ignore_errors: bool = False,
# TODO: ignore_after_termination: bool = True,
ignore_after_termination: bool = True,
eof_record: bool = True,
) -> Self:
r"""Parses records from a byte stream.
Expand All @@ -355,6 +359,9 @@ def parse(
ignore_errors (bool):
Ignore :class:`Exception` raised by :meth:`MosRecord.parse`.
ignore_after_termination (bool):
Ignore anything after the *End Of File* record was parsed.
eof_record (bool):
Interpret the last record as the *End Of File* record.
Expand Down Expand Up @@ -399,7 +406,8 @@ def parse(
data = data[start:endex]
stream = io.BytesIO(data)

file = super().parse(stream, ignore_errors=ignore_errors)
file = super().parse(stream, ignore_errors=ignore_errors,
ignore_after_termination=ignore_after_termination)
file = _cast(MosFile, file)

if eof_record:
Expand Down
15 changes: 4 additions & 11 deletions src/hexrec/formats/srec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import enum
import re
from typing import IO
from typing import Any
from typing import Mapping
from typing import Optional
Expand Down Expand Up @@ -412,6 +411,10 @@ def is_start(self) -> bool:
(self == self.START_24) or
(self == self.START_32))

def is_file_termination(self) -> bool:

return self.is_start()


SIZE_TO_ADDRESS_FORMAT: Mapping[int, bytes] = {
2: b'%04X',
Expand Down Expand Up @@ -877,16 +880,6 @@ def header(self, header: Optional[AnyBytes]) -> None:
self.discard_records()
self._header = header

@classmethod
def parse(
cls, stream: IO,
ignore_errors: bool = False,
# TODO: ignore_after_termination: bool = True,
) -> Self:

file = super().parse(stream, ignore_errors=ignore_errors)
return _cast(SrecFile, file)

@property
def startaddr(self) -> int:
r"""Start address.
Expand Down
16 changes: 4 additions & 12 deletions src/hexrec/formats/xtek.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import enum
import re
from typing import IO
from typing import Any
from typing import Mapping
from typing import Sequence
Expand Down Expand Up @@ -91,6 +90,10 @@ def is_eof(self) -> bool:

return self == self.EOF

def is_file_termination(self) -> bool:

return self.is_eof()


if not __TYPING_HAS_SELF: # pragma: no cover
del Self
Expand Down Expand Up @@ -527,17 +530,6 @@ def apply_records(self) -> Self:
self._startaddr = startaddr
return self

@classmethod
def parse(
cls,
stream: IO,
ignore_errors: bool = False,
# TODO: ignore_after_termination: bool = True,
) -> Self:

file = super().parse(stream, ignore_errors=ignore_errors)
return _cast(XtekFile, file)

@property
def startaddr(self) -> int:
r"""Start address.
Expand Down
4 changes: 2 additions & 2 deletions src/hexrec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def parse_int(
A generic object to convert to integer.
In case `value` is a :obj:`str` (case-insensitive), it can be
either prefixed with ``0x`` or postfixed with ``h`` to convert
from an hexadecimal representation, or prefixed with ``0b`` from
from a hexadecimal representation, or prefixed with ``0b`` from
binary; a prefix of only ``0`` converts from octal.
A further suffix of ``k`` or ``m`` scales as *kibibyte* or
*mebibyte*.
Expand Down Expand Up @@ -243,7 +243,7 @@ def unhexlify(
delete (bytes):
If empty or ``None``, no deletion occurs.
If ``Ellipsis``, :data:``DEFAULT_DELETE`` is used.
If ``Ellipsis``, :data:`DEFAULT_DELETE` is used.
Returns:
bytes: Raw byte string.
Expand Down
4 changes: 4 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,10 @@ class BaseTestTag:
def test_is_data(self):
...

@abc.abstractmethod
def test_is_file_termination(self):
...


class BaseTestRecord:

Expand Down
5 changes: 5 additions & 0 deletions tests/test_formats_asciihex.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def test_is_data(self):
assert AsciiHexTag.ADDRESS.is_data() is False
assert AsciiHexTag.CHECKSUM.is_data() is False

def test_is_file_termination(self):
assert AsciiHexTag.DATA.is_file_termination() is False
assert AsciiHexTag.ADDRESS.is_file_termination() is False
assert AsciiHexTag.CHECKSUM.is_file_termination() is False


class TestAsciiHexRecord(BaseTestRecord):

Expand Down
57 changes: 49 additions & 8 deletions tests/test_formats_ihex.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,6 @@ def test_is_eof(self):
assert IhexTag.EXTENDED_LINEAR_ADDRESS.is_eof() is False
assert IhexTag.START_LINEAR_ADDRESS.is_eof() is False

def test_is_start(self):
assert IhexTag.DATA.is_start() is False
assert IhexTag.END_OF_FILE.is_start() is False
assert IhexTag.EXTENDED_SEGMENT_ADDRESS.is_start() is False
assert IhexTag.START_SEGMENT_ADDRESS.is_start() is True
assert IhexTag.EXTENDED_LINEAR_ADDRESS.is_start() is False
assert IhexTag.START_LINEAR_ADDRESS.is_start() is True

def test_is_extension(self):
assert IhexTag.DATA.is_extension() is False
assert IhexTag.END_OF_FILE.is_extension() is False
Expand All @@ -84,6 +76,22 @@ def test_is_extension(self):
assert IhexTag.EXTENDED_LINEAR_ADDRESS.is_extension() is True
assert IhexTag.START_LINEAR_ADDRESS.is_extension() is False

def test_is_file_termination(self):
assert IhexTag.DATA.is_file_termination() is False
assert IhexTag.END_OF_FILE.is_file_termination() is True
assert IhexTag.EXTENDED_SEGMENT_ADDRESS.is_file_termination() is False
assert IhexTag.START_SEGMENT_ADDRESS.is_file_termination() is False
assert IhexTag.EXTENDED_LINEAR_ADDRESS.is_file_termination() is False
assert IhexTag.START_LINEAR_ADDRESS.is_file_termination() is False

def test_is_start(self):
assert IhexTag.DATA.is_start() is False
assert IhexTag.END_OF_FILE.is_start() is False
assert IhexTag.EXTENDED_SEGMENT_ADDRESS.is_start() is False
assert IhexTag.START_SEGMENT_ADDRESS.is_start() is True
assert IhexTag.EXTENDED_LINEAR_ADDRESS.is_start() is False
assert IhexTag.START_LINEAR_ADDRESS.is_start() is True


class TestIhexRecord(BaseTestRecord):

Expand Down Expand Up @@ -812,6 +820,39 @@ def test_parse_ignore_errors(self):
file = IhexFile.parse(stream, ignore_errors=True)
assert file._records == records

def test_parse_junk(self):
buffer = (
b':0312340061626391\r\n'
b':02000004ABCD82\r\n'
b':0356780078797AC4\r\n'
b':04000005ABCD5678B1\r\n'
b':00000001FF\r\n'
b'junk\r\nafter'
)
records = [
IhexRecord.create_data(0x1234, b'abc'),
IhexRecord.create_extended_linear_address(0xABCD),
IhexRecord.create_data(0x5678, b'xyz'),
IhexRecord.create_start_linear_address(0xABCD5678),
IhexRecord.create_end_of_file(),
]
with io.BytesIO(buffer) as stream:
file = IhexFile.parse(stream, ignore_after_termination=True)
assert file._records == records

def test_parse_raises_junk(self):
buffer = (
b':0312340061626391\r\n'
b':02000004ABCD82\r\n'
b':0356780078797AC4\r\n'
b':04000005ABCD5678B1\r\n'
b':00000001FF\r\n'
b'junk\r\nafter'
)
with pytest.raises(ValueError, match='syntax error'):
with io.BytesIO(buffer) as stream:
IhexFile.parse(stream, ignore_after_termination=False)

def test_save_file(self, tmppath):
path = str(tmppath / 'test_save_file.hex')
records = [
Expand Down
Loading

0 comments on commit 07c6f4f

Please sign in to comment.