diff --git a/tensorizer/serialization.py b/tensorizer/serialization.py index c534a16..6c4eb10 100644 --- a/tensorizer/serialization.py +++ b/tensorizer/serialization.py @@ -321,6 +321,114 @@ def from_io( return cls(version_number, feature_flags, tensor_size, tensor_count) +@dataclasses.dataclass +class _FileLayout: + _VERSION_TAG: ClassVar[int] = 1 + _NUM_ENTRIES: ClassVar[int] = 3 + metadata_start: Optional[int] = None + metadata_end: Optional[int] = None + _METADATA_TAG: ClassVar[int] = 1 + header_start: Optional[int] = None + header_end: Optional[int] = None + _HEADER_TAG: ClassVar[int] = 2 + data_start: Optional[int] = None + _DATA_TAG: ClassVar[int] = 3 + + _FORMAT: ClassVar[struct.Struct] = struct.Struct( + "<" + "B" # Layout version tag + "B" # Number of entries + "B" # Metadata segment tag + "Q" # Metadata segment start + "Q" # Metadata segment end + "B" # Header segment tag + "Q" # Header segment start + "Q" # Header segment end + "B" # Tensor data segment tag + "Q" # Tensor data segment start + ) + + _LENGTH_FORMAT: ClassVar[struct.Struct] = struct.Struct(" bytearray: + buffer: bytearray = bytearray(cls._FORMAT_SIZE_WITH_LENGTH) + cls._LENGTH_FORMAT.pack_into(buffer, 0, cls._FORMAT_SIZE) + return buffer + + def pack(self) -> bytearray: + if None in ( + self.metadata_start, + self.metadata_end, + self.header_start, + self.header_end, + self.data_start, + ): + raise RuntimeError("Cannot build; incomplete data") + + buffer: bytearray = self.placeholder() + + self._FORMAT.pack_into( + buffer, + self._LENGTH_SIZE, + self._VERSION_TAG, + self._NUM_ENTRIES, + self._METADATA_TAG, + self.metadata_start, + self.metadata_end, + self._HEADER_TAG, + self.header_start, + self.header_end, + self._DATA_TAG, + self.data_start, + ) + + return buffer + + @classmethod + def parse(cls, buffer: Union[bytearray, bytes]) -> "_FileLayout": + ( + version_tag, + num_entries, + metadata_tag, + metadata_start, + metadata_end, + header_tag, + header_start, + header_end, + data_tag, + data_start, + ) = cls._FORMAT.unpack(buffer) + + if version_tag != cls._VERSION_TAG: + raise ValueError("Unknown layout version") + elif num_entries != cls._NUM_ENTRIES: + raise ValueError("Incorrect number of layout entries") + elif metadata_tag != cls._METADATA_TAG: + raise ValueError("Incorrect metadata segment tag in layout") + elif header_tag != cls._HEADER_TAG: + raise ValueError("Incorrect header segment tag in layout") + elif data_tag != cls._DATA_TAG: + raise ValueError("Incorrect data segment tag in layout") + + parsed: cls = cls() + parsed.metadata_start = metadata_start + parsed.metadata_end = metadata_end + parsed.header_start = header_start + parsed.header_end = header_end + parsed.data_start = data_start + return parsed + + @classmethod + def from_io(cls, reader) -> "_FileLayout": + size: int = cls._LENGTH_FORMAT.unpack(reader.read(cls._LENGTH_SIZE))[0] + buffer: bytes = reader.read(size) + return cls.parse(buffer) + + @dataclasses.dataclass(init=False) class _TensorHeaderSerializer: # Fields with ClassVar are shared across all instances, @@ -412,7 +520,8 @@ def __init__( dtype: bytes, shape: Sequence[int], data_length: int, - file_offset: int, # location of header in file + header_offset: int, # location of header in the header segment + data_offset: int, # location of tensor data in the data segment include_crc32: bool = True, include_sha256: bool = True, crypt_info: Optional[_crypt_info.CryptInfo] = None, @@ -423,7 +532,8 @@ def __init__( self.shape = shape self.dtype = dtype self.data_length = data_length - self.file_offset = file_offset + self.header_offset = header_offset + self.data_offset = data_offset self.include_crc32 = include_crc32 self.include_sha256 = include_sha256 @@ -445,6 +555,13 @@ def __init__( shape_len=self.shape_len, ) ) + self.metadata_entry_segment = struct.Struct( + self.metadata_entry_segment_template.format( + name_len=self.name_len, + dtype_len=self.dtype_len, + shape_len=self.shape_len, + ) + ) crc32_len = sha256_len = self.hash_count = 0 self.has_crc32 = include_crc32 self.has_sha256 = include_sha256 @@ -476,10 +593,6 @@ def __init__( ) ) - def build(self, tensor_data_offset: int): - # tensor_data_offset: location of tensor data in file - self.data_offset = tensor_data_offset - self.buffer = bytearray(self.size) self.start_segment.pack_into( self.buffer, @@ -520,9 +633,7 @@ def build(self, tensor_data_offset: int): self.buffer, self.data_length_offset, self.data_length ) - metadata_entry_segment = self.get_metadata_entry_segment() - - self.metadata_entry = metadata_entry_segment.pack( + self.metadata_entry = self.metadata_entry_segment.pack( self.name_len, # Name length self.name, # Name self.tensor_type.value, # Whether this is a parameter or a buffer @@ -530,21 +641,12 @@ def build(self, tensor_data_offset: int): self.dtype, # Dtype self.shape_len, # Shape length *self.shape, # Shape - self.file_offset, # Header start (relative to the file) - # Tensor data start (relative to the file): + self.header_offset, # Header start (relative to the header segment) + # Tensor data start (relative to the tensor data segment): self.data_offset, self.data_length, # Tensor length ) - def get_metadata_entry_segment(self) -> struct.Struct: - return struct.Struct( - self.metadata_entry_segment_template.format( - name_len=self.name_len, - dtype_len=self.dtype_len, - shape_len=self.shape_len, - ) - ) - def _hashable_segment_views(self): # Skip areas where we store hashes and crypt_info yield memoryview(self.buffer)[: self.hash_header_offset] @@ -1705,6 +1807,11 @@ def __init__( "Tensor is encrypted, but decryption was not requested" ) + # Read the file layout, if present + self._layout: Optional[_FileLayout] = None + if version_number >= HEADERS_AT_TOP_TENSORIZER_VERSION: + self._layout = _FileLayout.from_io(self._file) + # Read the metadata index of tensors. # This is a list of offsets into the file where the per-tensor data # is stored. @@ -1714,6 +1821,11 @@ def __init__( self._file, self._file_header.tensor_count ) ) + if self._layout is not None: + for entry in self._metadata.values(): + entry: TensorEntry + entry.data_offset += self._layout.data_start + entry.offset += self._layout.header_start if not self._metadata: raise ValueError("Tensor index in the file is empty") @@ -3502,26 +3614,87 @@ def __init__( ) self._write(self._file_header.to_bytes()) - self._metadata_start = self._file.tell() - self._metadata_cur = ( - self._metadata_start + 8 - ) # position of next metadata entry we'd write. Leave 8 bytes for metadata length field + self._layout_pos: int = self._file.tell() + self._layout_size: int = self._write(_FileLayout.placeholder()) + self._last_layout: Optional[_FileLayout] = None + + self._flush() # Flush potentially-buffered writes from `_write` calls + + self._metadata_start: int = self._layout_pos + self._layout_size + + # Offset of next metadata entry we'd write, relative to + # the metadata segment (i.e., bytes after self._metadata_start). + # Leave 8 bytes for metadata length field + self._metadata_offset: int = 8 + + # Next header write offset, relative to the header segment + # (i.e., bytes after self._metadata_end). + self._header_offset: int = 0 + + # Next tensor write position relative to the start of the data segment + # (i.e., bytes after self._header_end). + self._tensor_offset: int = 0 + self._metadata_end: Optional[int] = None self._header_end: Optional[int] = None if max_tensors: - # Estimate 256 bytes per metadata entry and 1024 bytes per header entry - self._metadata_end = self._metadata_cur + max_tensors * 256 - # this is less about header_end itself but ensuring that tensor_start is on a 4096-byte aligned boundary - self._header_end = ( - (self._metadata_end + max_tensors * 1024) + 4095 - ) & ~4095 - - self._header_cur = ( - self._metadata_end - ) # is the start of where to write header data, or None - self._tensor_cur = ( - self._header_end - ) # is the start of where to write tensor data. or None + approx_metadata_size: int = 256 * max_tensors + approx_header_size: int = 1024 * max_tensors + + self._metadata_end = self._metadata_start + 8 + approx_metadata_size + # Extend the metadata segment to end on a block boundary + # This allows later manipulation with the fallocate syscall + # on coöperating operating systems to extend this segment + # in the middle of the file + self._metadata_end -= self._metadata_end % -4096 + + self._header_end = self._metadata_end + approx_header_size + # Extend the header segment to end on a block boundary + # This has the same benefits as above, plus positioning + # the following tensor data segment to begin on a block boundary, + # which can allow for more efficient read operations. + self._header_end -= self._header_end % -4096 + + # The layout is static at this point, so it can be written now. + self._update_layout() + + def _tensor_offset_to_file_pos(self, tensor_offset: int) -> int: + if self._header_end is None: + raise RuntimeError( + "The size of the file header isn't known yet," + "so the file position for tensors cannot be determined" + ) + return self._header_end + tensor_offset + + @property + def _tensor_segment_end(self) -> int: + return self._tensor_offset_to_file_pos(self._tensor_offset) + + def _update_layout(self) -> int: + # In case this is called with any of these still unset, + # (e.g., from closing the serializer without writing anything), + # mark those segments as empty + metadata_start = self._metadata_start + metadata_end = self._metadata_end or metadata_start + header_end = self._header_end or metadata_end + + layout: _FileLayout = _FileLayout( + metadata_start=metadata_start, + metadata_end=metadata_end, + header_start=metadata_end, + header_end=header_end, + data_start=header_end, + ) + + if layout != self._last_layout: + size = self._pwrite( + layout.pack(), + self._layout_pos, + verify=self._layout_size, + ) + self._last_layout = layout + return size + return 0 @property def total_tensor_bytes(self): @@ -3546,14 +3719,16 @@ def _sync_prologue_state(self): """ # Write our zero-length field, that indicates that this is the last # tensor. This will be overwritten if another tensor is written. - self._pwrite(struct.pack(" None: - # We first need to construct the headers so that we know the size of each for w in write_specs: - dtype_bytes = w.dtype.encode("utf-8") # type: ignore + dtype_bytes: bytes = w.dtype.encode("utf-8") if len(dtype_bytes) >= 256: raise ValueError("dtype name length should be less than 256") + # Each individual tensor begins on an 8-byte aligned boundary + self._tensor_offset -= self._tensor_offset % -8 + w.header = _TensorHeaderSerializer( w.module_index, w.tensor_type, @@ -4386,64 +4563,41 @@ def _prepare_for_write_headers( dtype_bytes, w.shape, w.data_length, - 0, # placeholder file_offset + self._header_offset, + self._tensor_offset, include_crc32=w.include_crc32, include_sha256=w.include_sha256, crypt_info=w.crypt_info, ) - # Specify the offsets for each metadata entry - file_offset = ( - self._metadata_cur - ) # position of next metadata entry to write + # Record the offset of this metadata entry + w.metadata_offset = self._metadata_offset - ## metadata - for w in write_specs: - w.metadata_pos = file_offset - file_offset += w.header.get_metadata_entry_segment().size + self._metadata_offset += w.header.metadata_entry_segment.size + self._header_offset += w.header.size + self._tensor_offset += w.data_length - self._metadata_cur = file_offset + # Set or validate self._metadata_end + file_offset: int = self._metadata_start + self._metadata_offset if self._metadata_end is None: - self._metadata_end = self._metadata_cur + # Extend to the end of the current block + file_offset -= file_offset % -4096 + self._metadata_end = file_offset + # Set the end of the metadata to the next block boundary elif file_offset > self._metadata_end: raise RuntimeError("Metadata block is full. Increase max_tensors") - ## headers - if self._header_cur is not None: - if self._header_cur < file_offset: - raise RuntimeError("Somehow wrote past metadata block") - file_offset = self._header_cur - - for w in write_specs: - w.header.file_offset = file_offset - file_offset += w.header.size - - self._header_cur = file_offset + # Set or validate self._header_end + file_offset = self._metadata_end + self._header_offset if self._header_end is None: - self._header_end = self._header_cur - elif self._header_cur > self._header_end: + # Extend to the end of the current block + file_offset -= file_offset % -4096 + self._header_end = file_offset + elif file_offset > self._header_end: raise RuntimeError("Header block is full. Increase max_tensors") - ## tensors - if self._tensor_cur is None: - # The block of tensor data starts on a page-aligned boundary - self._tensor_cur = (file_offset + 4095) & ~4095 - else: - if self._tensor_cur < file_offset: - raise RuntimeError("Somehow wrote past header block") - # Each tensor itself begins on an 8-byte aligned boundary - file_offset = (self._tensor_cur + 7) & ~7 - - # file_offset is now where we should start writing tensor data - for w in write_specs: - w.header.build(file_offset) # type: ignore - file_offset += w.data_length - - self._tensor_cur = file_offset - - def _prepare_for_write_meta( - self, write_specs: Sequence[_WriteSpec] - ) -> None: + @staticmethod + def _prepare_for_write_meta(write_specs: Sequence[_WriteSpec]) -> None: for w in write_specs: if not w.tensor.is_meta: continue @@ -4539,50 +4693,52 @@ def do_commit( dependencies: Sequence[_Future], ): # Fast version: makes one buffer containing the size, metadata, and headers, and writes it one go - header_block_size = self._header_cur - self._metadata_start - header_buffer = bytearray(header_block_size) + metadata_start: int = self._metadata_start + metadata_segment_size: int = self._metadata_end - metadata_start + header_block_size: int = metadata_segment_size + self._header_offset + header_buffer: bytearray = bytearray(header_block_size) - metadata_start = self._metadata_start - metadata_size = ( - self._metadata_cur - metadata_start - 8 - ) # 8 bytes for metadata length field - struct.pack_into(" 4096 bytes total from repeating it 25 times + serializer.write_tensor( + i, + f"{long_name_stem}_{i:02d}", + TensorType.PARAM, + torch.zeros((4, 4), dtype=torch.uint8), + ) + serializer.close() def test_too_many(self): # If you set max_tensors too low you'll eventually run out of header space