From f896d5e9337ea57405ad2babc22141e5243380d5 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Fri, 25 Oct 2024 12:45:47 +0100 Subject: [PATCH] HPCC-32111 Use compression in global merge activity Signed-off-by: Jake Smith --- thorlcr/activities/merge/thmergeslave.cpp | 20 +++++++++++++++++--- thorlcr/msort/tsorta.cpp | 4 ++-- thorlcr/msort/tsorta.hpp | 2 +- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/thorlcr/activities/merge/thmergeslave.cpp b/thorlcr/activities/merge/thmergeslave.cpp index 3dbfed026af..4defeb41458 100644 --- a/thorlcr/activities/merge/thmergeslave.cpp +++ b/thorlcr/activities/merge/thmergeslave.cpp @@ -47,6 +47,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity offset_t *partitionpos; size32_t chunkmaxsize; unsigned width; + unsigned rwFlags = 0x0; // flags for streams (e.g. compression flags) class cRemoteStream : implements IRowStream, public CSimpleInterface { @@ -220,7 +221,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity CThorKeyArray partition(*this, queryRowInterfaces(this),helper->querySerialize(),helper->queryCompare(),helper->queryCompareKey(),helper->queryCompareRowKey()); partition.deserialize(mb,false); - partition.calcPositions(tmpfile,sample); + partition.calcPositions(tmpfile, sample, rwFlags); partitionpos = new offset_t[width]; unsigned i; for (i=0;i writer = createRowWriter(tmpfile, this); + Owned writer = createRowWriter(tmpfile, this, rwFlags); CThorKeyArray sample(*this, this, helper->querySerialize(), helper->queryCompare(), helper->queryCompareKey(), helper->queryCompareRowKey()); sample.setSampling(MERGE_TRANSFER_BUFFER_SIZE); ActPrintLog("MERGE: start gather"); @@ -366,7 +380,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity offset_t end = partitionpos[idx]; if (pos>=end) return 0; - Owned rs = createRowStreamEx(tmpfile, queryRowInterfaces(this), pos, end); // this is not good + Owned rs = createRowStreamEx(tmpfile, queryRowInterfaces(this), pos, end, (unsigned __int64)-1, rwFlags); // this is not good offset_t so = rs->getOffset(); size32_t len = 0; size32_t chunksize = chunkmaxsize; diff --git a/thorlcr/msort/tsorta.cpp b/thorlcr/msort/tsorta.cpp index 861f504786c..866ff37ae25 100644 --- a/thorlcr/msort/tsorta.cpp +++ b/thorlcr/msort/tsorta.cpp @@ -443,7 +443,7 @@ offset_t CThorKeyArray::findLessRowPos(const void * row) return getFixedFilePos(p); } -void CThorKeyArray::calcPositions(IFile *file,CThorKeyArray &sample) +void CThorKeyArray::calcPositions(IFile *file, CThorKeyArray &sample, unsigned rwFlags) { // calculates positions based on sample // not fast! @@ -459,7 +459,7 @@ void CThorKeyArray::calcPositions(IFile *file,CThorKeyArray &sample) if (pos==(offset_t)-1) pos = 0; // should do bin-chop for fixed length but initially do sequential search - Owned s = createRowStreamEx(file, rowif, pos); + Owned s = createRowStreamEx(file, rowif, pos, 0, (offset_t)-1, rwFlags); for (;;) { OwnedConstThorRow rowcmp = s->nextRow(); diff --git a/thorlcr/msort/tsorta.hpp b/thorlcr/msort/tsorta.hpp index ae8b1fe20d8..30097801299 100644 --- a/thorlcr/msort/tsorta.hpp +++ b/thorlcr/msort/tsorta.hpp @@ -114,7 +114,7 @@ class THORSORT_API CThorKeyArray void deserialize(MemoryBuffer &mb,bool append); void sort(); void createSortedPartition(unsigned pn); - void calcPositions(IFile *file,CThorKeyArray &sample); + void calcPositions(IFile *file, CThorKeyArray &sample, unsigned rwFlags); void setSampling(size32_t _maxsamplesize, unsigned _divisor=0); int keyCompare(unsigned a,unsigned b); offset_t getFixedFilePos(unsigned i);