Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drastic performance improvements for reads (#249) #342

Merged
merged 12 commits into from
Dec 11, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ coverage.xml
.ipynb_checkpoints
.vscode
*.ipynb
.idea

# Wercker directories
_builds
Expand Down
6 changes: 3 additions & 3 deletions nptdms/base_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ def _read_data_chunk(self, file, data_objects, chunk_index):
"""
raise NotImplementedError("Data chunk reading must be implemented in base classes")

def read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk):
def read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk, chunk_size):
johannesloibl marked this conversation as resolved.
Show resolved Hide resolved
""" Read multiple data chunks for a single channel at once
In the base case we read each chunk individually but subclasses can override this
"""
for chunk_index in range(chunk_offset, stop_chunk):
yield self._read_channel_data_chunk(file, data_objects, chunk_index, channel_path)
yield self._read_channel_data_chunk(file, data_objects, chunk_index, channel_path, chunk_size)

def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path):
def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path, chunk_size):
""" Read data from a chunk for a single channel
"""
# In the base case we can read data for all channels
Expand Down
76 changes: 52 additions & 24 deletions nptdms/tdms_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class TdmsSegment(object):
'final_chunk_lengths_override',
'object_index',
'segment_incomplete',
'has_daqmx_objects_cached',
'chunk_size_cached',
'data_objects_cached',
]

def __init__(self, position, toc_mask, next_segment_pos, data_position, segment_incomplete):
Expand All @@ -57,6 +60,9 @@ def __init__(self, position, toc_mask, next_segment_pos, data_position, segment_
self.ordered_objects = None
self.object_index = None
self.segment_incomplete = segment_incomplete
self.has_daqmx_objects_cached = None
self.chunk_size_cached = None
self.data_objects_cached = None

def __repr__(self):
return "<TdmsSegment at position %d>" % self.position
Expand Down Expand Up @@ -135,6 +141,7 @@ def read_segment_objects(self, file, previous_segment_objects, index_cache, prev

if index_cache is not None:
self.object_index = index_cache.get_index(self.ordered_objects)

self._calculate_chunks()
return properties

Expand Down Expand Up @@ -261,18 +268,18 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks=

f.seek(self.data_position)

data_objects = [o for o in self.ordered_objects if o.has_data]
chunk_size = self._get_chunk_size()

# Ensure we're working with Python ints as np.int32 values could overflow
# (https://github.com/adamreeve/npTDMS/issues/338)
chunk_size = int(chunk_size)
chunk_offset = int(chunk_offset)

if chunk_offset > 0:
f.seek(chunk_size * chunk_offset, os.SEEK_CUR)
stop_chunk = self.num_chunks if num_chunks is None else num_chunks + chunk_offset
for chunk in self._read_channel_data_chunks(f, data_objects, channel_path, chunk_offset, stop_chunk):
for chunk in self._read_channel_data_chunks(
f, self._get_data_objects(), channel_path, chunk_offset, stop_chunk, chunk_size
):
yield chunk

def _calculate_chunks(self):
Expand Down Expand Up @@ -351,11 +358,15 @@ def _new_segment_object(self, object_path, raw_data_index_header):
return TdmsSegmentObject(object_path)

def _get_chunk_size(self):
if self.chunk_size_cached is not None:
return self.chunk_size_cached

if self._have_daqmx_objects():
return get_daqmx_chunk_size(self.ordered_objects)
return sum(
o.data_size
for o in self.ordered_objects if o.has_data)
self.chunk_size_cached = int(get_daqmx_chunk_size(self.ordered_objects))
return self.chunk_size_cached

self.chunk_size_cached = int(sum(o.data_size for o in self.ordered_objects if o.has_data))
return self.chunk_size_cached

def _read_data_chunks(self, file, data_objects, num_chunks):
""" Read multiple data chunks at once
Expand All @@ -365,13 +376,17 @@ def _read_data_chunks(self, file, data_objects, num_chunks):
for chunk in reader.read_data_chunks(file, data_objects, num_chunks):
yield chunk

def _read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk):
def _read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk, chunk_size):
""" Read multiple data chunks for a single channel at once
In the base case we read each chunk individually but subclasses can override this
"""
reader = self._get_data_reader()
for chunk in reader.read_channel_data_chunks(file, data_objects, channel_path, chunk_offset, stop_chunk):
initial_position = file.tell()
for i, chunk in enumerate(reader.read_channel_data_chunks(
file, data_objects, channel_path, chunk_offset, stop_chunk, chunk_size
)):
yield chunk
file.seek(initial_position + (i + 1) * chunk_size)
johannesloibl marked this conversation as resolved.
Show resolved Hide resolved

def _get_data_reader(self):
endianness = '>' if (self.toc_mask & toc_properties['kTocBigEndian']) else '<'
Expand All @@ -383,6 +398,9 @@ def _get_data_reader(self):
return ContiguousDataReader(self.num_chunks, self.final_chunk_lengths_override, endianness)

def _have_daqmx_objects(self):
if self.has_daqmx_objects_cached is not None:
return self.has_daqmx_objects_cached

data_obj_count = 0
daqmx_count = 0
for o in self.ordered_objects:
Expand All @@ -391,12 +409,12 @@ def _have_daqmx_objects(self):
if isinstance(o, DaqmxSegmentObject):
daqmx_count += 1
if daqmx_count == 0:
return False
if daqmx_count == data_obj_count:
return True
if daqmx_count > 0:
self.has_daqmx_objects_cached = False
elif daqmx_count == data_obj_count:
self.has_daqmx_objects_cached = True
elif daqmx_count > 0:
raise Exception("Cannot read mixed DAQmx and non-DAQmx data")
return False
return self.has_daqmx_objects_cached

def _have_interleaved_data(self):
""" Whether data in this segment is interleaved. Assumes data is not DAQmx.
Expand All @@ -420,6 +438,13 @@ def _have_interleaved_data(self):
else:
raise ValueError("Cannot read interleaved segment containing channels with unsized types")

def _get_data_objects(self):
if self.data_objects_cached is not None:
return self.data_objects_cached

self.data_objects_cached = [o for o in self.ordered_objects if o.has_data]
return self.data_objects_cached


class InterleavedDataReader(BaseDataReader):
""" Reads data in a TDMS segment with interleaved data
Expand All @@ -436,7 +461,7 @@ def read_data_chunks(self, file, data_objects, num_chunks):
raise ValueError("Cannot read interleaved data with different chunk sizes")
return [self._read_interleaved_chunks(file, data_objects, num_chunks)]

def read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk):
def read_channel_data_chunks(self, file, data_objects, channel_path, chunk_offset, stop_chunk, chunk_size):
""" Read multiple data chunks for a single channel at once
"""
num_chunks = stop_chunk - chunk_offset
Expand Down Expand Up @@ -488,28 +513,31 @@ def _read_data_chunk(self, file, data_objects, chunk_index):
object_data[obj.path] = obj.read_values(file, number_values, self.endianness)
return RawDataChunk.channel_data(object_data)

def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path):
def _read_channel_data_chunk(self, file, data_objects, chunk_index, channel_path, chunk_size):
""" Read data from a chunk for a single channel
"""
channel_data = RawChannelDataChunk.empty()
current_position = file.tell()
for obj in data_objects:
number_values = self._get_channel_number_values(obj, chunk_index)
if obj.path == channel_path:
file.seek(current_position)
channel_data = RawChannelDataChunk.channel_data(obj.read_values(file, number_values, self.endianness))
current_position = file.tell()
johannesloibl marked this conversation as resolved.
Show resolved Hide resolved
break
elif number_values == obj.number_values:
# Seek over data for other channel data
file.seek(obj.data_size, os.SEEK_CUR)
else:
current_position += obj.data_size
elif obj.data_type.size is not None:
johannesloibl marked this conversation as resolved.
Show resolved Hide resolved
# In last chunk with reduced chunk size
if obj.data_type.size is None:
# Type is unsized (eg. string), try reading number of values
obj.read_values(file, number_values, self.endianness)
else:
file.seek(obj.data_type.size * number_values, os.SEEK_CUR)
current_position += obj.data_type.size * number_values
else:
raise Exception("Cannot skip over channel with unsized type in a truncated segment")

return channel_data

def _get_channel_number_values(self, obj, chunk_index):
if chunk_index == (self.num_chunks - 1) and self.final_chunk_lengths_override is not None:
johannesloibl marked this conversation as resolved.
Show resolved Hide resolved
if self.final_chunk_lengths_override is not None and chunk_index == (self.num_chunks - 1):
return self.final_chunk_lengths_override.get(obj.path, 0)
else:
return obj.number_values
Expand Down
Loading