diff --git a/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py b/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py index 7001a703d63..d3fb1e618eb 100755 --- a/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py +++ b/src/DIRAC/Core/DISET/private/Transports/BaseTransport.py @@ -42,7 +42,7 @@ class BaseTransport: def __init__(self, stServerAddress, bServerMode=False, **kwargs): self.bServerMode = bServerMode self.extraArgsDict = kwargs - self.byteStream = b"" + self.byteStream = bytearray() self.packetSize = 1048576 # 1MiB self.stServerAddress = stServerAddress self.peerCredentials = {} @@ -191,7 +191,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal maxBufferSize = max(maxBufferSize, 0) try: # Look either for message length of keep alive magic string - iSeparatorPosition = self.byteStream.find(b":", 0, 10) + iSeparatorPosition = self.byteStream.find(b":") keepAliveMagicLen = len(BaseTransport.keepAliveMagic) isKeepAlive = self.byteStream.find(BaseTransport.keepAliveMagic, 0, keepAliveMagicLen) == 0 # While not found the message length or the ka, keep receiving @@ -204,9 +204,10 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal if not retVal["Value"]: return S_ERROR("Peer closed connection") # New data! - self.byteStream += retVal["Value"] - # Look again for either message length of ka magic string - iSeparatorPosition = self.byteStream.find(b":", 0, 10) + self.byteStream.extend(retVal["Value"]) + + # Look again for either message length or keep alive magic string + iSeparatorPosition = self.byteStream.find(b":") isKeepAlive = self.byteStream.find(BaseTransport.keepAliveMagic, 0, keepAliveMagicLen) == 0 # Over the limit? if maxBufferSize and len(self.byteStream) > maxBufferSize and iSeparatorPosition == -1: @@ -214,7 +215,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal # Keep alive magic! if isKeepAlive: gLogger.debug("Received keep alive header") - # Remove the ka magic from the buffer and process the keep alive + # Remove the keep-alive magic from the buffer and process the keep-alive self.byteStream = self.byteStream[keepAliveMagicLen:] return self.__processKeepAlive(maxBufferSize, blockAfterKeepAlive) # From here it must be a real message! @@ -225,7 +226,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal if readSize >= pkgSize: # If we already have all the data we need data = pkgData[:pkgSize] - self.byteStream = pkgData[pkgSize:] + self.byteStream = self.byteStream[pkgSize + iSeparatorPosition + 1 :] else: # If we still need to read stuff pkgMem = BytesIO() @@ -245,11 +246,12 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal # Data is here! take it out from the bytestream, dencode and return if readSize == pkgSize: data = pkgMem.getvalue() - self.byteStream = b"" - else: # readSize > pkgSize: + self.byteStream = bytearray() # Reset the byteStream + else: pkgMem.seek(0, 0) data = pkgMem.read(pkgSize) - self.byteStream = pkgMem.read() + self.byteStream = bytearray(pkgMem.read()) # store the rest in bytearray + try: data = MixedEncode.decode(data)[0] except Exception as e: diff --git a/src/DIRAC/Core/Utilities/DEncode.py b/src/DIRAC/Core/Utilities/DEncode.py index 3a20b868192..bae1a2179c1 100755 --- a/src/DIRAC/Core/Utilities/DEncode.py +++ b/src/DIRAC/Core/Utilities/DEncode.py @@ -521,7 +521,7 @@ def decode(data): if not data: return data # print("DECODE FUNCTION : %s" % g_dDecodeFunctions[ sStream [ iIndex ] ]) - if not isinstance(data, bytes): + if not isinstance(data, (bytes, bytearray)): raise NotImplementedError("This should never happen") return g_dDecodeFunctions[data[0]](data, 0)