diff --git a/autotest/pymod/gdaltest.py b/autotest/pymod/gdaltest.py index 417a0768181e..e02254740f9d 100755 --- a/autotest/pymod/gdaltest.py +++ b/autotest/pymod/gdaltest.py @@ -2102,87 +2102,8 @@ def reopen(ds, update=False, open_options=None): ) -# VSIFile helper class - - -class VSIFile: - def __init__(self, path, mode, encoding="utf-8"): - self._path = path - self._mode = mode - - self._binary = "b" in mode - self._encoding = encoding - - self._fp = gdal.VSIFOpenExL(self._path, self._mode, True) - if self._fp is None: - raise OSError(gdal.VSIGetLastErrorMsg()) - - self._closed = False - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() - - def __iter__(self): - return self - - def __next__(self): - line = gdal.CPLReadLineL(self._fp) - if line is None: - raise StopIteration - if self._binary: - return line.encode() - return line - - def close(self): - if self._closed: - return - - self._closed = True - gdal.VSIFCloseL(self._fp) - - def read(self, size=-1): - if size == -1: - pos = self.tell() - self.seek(0, 2) - size = self.tell() - self.seek(pos) - - raw = gdal.VSIFReadL(1, size, self._fp) - - if self._binary: - return bytes(raw) - else: - return raw.decode(self._encoding) - - def write(self, x): - - if self._binary: - assert type(x) in (bytes, bytearray, memoryview) - else: - assert type(x) is str - x = x.encode(self._encoding) - - planned_write = len(x) - actual_write = gdal.VSIFWriteL(x, 1, planned_write, self._fp) - - if planned_write != actual_write: - raise OSError( - f"Expected to write {planned_write} bytes but {actual_write} were written" - ) - - def seek(self, offset, whence=0): - if gdal.VSIFSeekL(self._fp, offset, whence) != 0: - raise OSError(gdal.VSIGetLastErrorMsg()) - - def tell(self): - return gdal.VSIFTellL(self._fp) - - def vsi_open(path, mode="r"): - return VSIFile(path, mode) + return gdal.VSIFile(path, mode) def vrt_has_open_support(): diff --git a/doc/source/spelling_wordlist.txt b/doc/source/spelling_wordlist.txt index 77d3e4e14e6c..4c4ea2d861af 100644 --- a/doc/source/spelling_wordlist.txt +++ b/doc/source/spelling_wordlist.txt @@ -3455,6 +3455,7 @@ vsiaz vsicached vsicrypt vsicurl +VSIFile VSIFOpen vsigs vsigz diff --git a/swig/include/python/gdal_python.i b/swig/include/python/gdal_python.i index 902aa5d6bf50..d6354b63a47e 100644 --- a/swig/include/python/gdal_python.i +++ b/swig/include/python/gdal_python.i @@ -5012,3 +5012,102 @@ def InterpolateAtPoint(self, *args, **kwargs): else: return ret[1] %} + +%pythoncode %{ + +# VSIFile: Copyright (c) 2024, Dan Baston + +from io import BytesIO + +class VSIFile(BytesIO): + """Class wrapping a GDAL VSILFILE instance as a Python BytesIO instance + + :since: GDAL 3.11 + """ + + def __init__(self, path, mode, encoding="utf-8"): + self._path = path + self._mode = mode + + self._binary = "b" in mode + self._encoding = encoding + + self._fp = VSIFOpenExL(self._path, self._mode, True) + if self._fp is None: + raise OSError(VSIGetLastErrorMsg()) + + self._closed = False + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def __iter__(self): + return self + + def __next__(self): + line = CPLReadLineL(self._fp) + if line is None: + raise StopIteration + if self._binary: + return line.encode() + return line + + def close(self): + if self._closed: + return + + self._closed = True + VSIFCloseL(self._fp) + + def read(self, size=-1): + if size == -1: + pos = self.tell() + self.seek(0, 2) + size = self.tell() + self.seek(pos) + + raw = VSIFReadL(1, size, self._fp) + + if self._binary: + return bytes(raw) + else: + return raw.decode(self._encoding) + + def write(self, x): + + if self._binary: + assert type(x) in (bytes, bytearray, memoryview) + else: + assert type(x) is str + x = x.encode(self._encoding) + + planned_write = len(x) + actual_write = VSIFWriteL(x, 1, planned_write, self._fp) + + if planned_write != actual_write: + raise OSError( + f"Expected to write {planned_write} bytes but {actual_write} were written" + ) + + def seek(self, offset, whence=0): + # We redefine the docstring since otherwise breathe would complain on the one coming from BytesIO.seek() + """Change stream position. + + Seek to byte offset pos relative to position indicated by whence: + + - 0: Start of stream (the default). pos should be >= 0; + - 1: Current position - pos may be negative; + - 2: End of stream - pos usually negative. + + Returns the new absolute position. + """ + + if VSIFSeekL(self._fp, offset, whence) != 0: + raise OSError(VSIGetLastErrorMsg()) + + def tell(self): + return VSIFTellL(self._fp) +%}