Skip to content

Commit

Permalink
HPCC-32845 Guard against KJ reading TLKs as regular index parts
Browse files Browse the repository at this point in the history
Also remove some redundant 'delayed' functionality.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Oct 22, 2024
1 parent 1e4ca26 commit aa52969
Showing 1 changed file with 13 additions and 18 deletions.
31 changes: 13 additions & 18 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
{
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> partBits;
Owned<IKeyIndex> keyIndex = activity.createPartKeyIndex(partNo, copy, false);
Owned<IKeyIndex> keyIndex = activity.createPartKeyIndex(partNo, copy);
partKeySet->addIndex(keyIndex.getClear());
}
keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, &contextLogger, helper->hasNewSegmentMonitors(), false);
Expand Down Expand Up @@ -2454,7 +2454,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
return tlkKeyIndexes.ordinality();
}
IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy, bool delayed)
IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy)
{
IPartDescriptor &filePart = allIndexParts.item(partNo);
unsigned crc=0;
Expand All @@ -2464,25 +2464,20 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
StringBuffer filename;
rfn.getPath(filename);

if (delayed)
{
Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr);
Owned<IDelayedFile> delayedFile = createDelayedFile(lazyIFileIO);
return createKeyIndex(filename, crc, *delayedFile, (unsigned) -1, false, 0);
}
else
{
/* NB: createKeyIndex here, will load the key immediately
* But that's okay, because we are only here on demand.
* The underlying IFileIO can later be closed by fhe file caching mechanism.
*/
Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr);
return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false, 0);
}
/* NB: createKeyIndex here, will load the key immediately
* But that's okay, because we are only here on demand.
* The underlying IFileIO can later be closed by fhe file caching mechanism.
*/
Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr);
Owned<IKeyIndex> index = createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false, 0);
dbgassertex(index);
if (index->isTopLevelKey())
throw MakeActivityException(this, 0, "Invalid key part %u [%s]. Should not be joining against a TLK part.", partNo+1, filename.str());
return index.getClear();
}
IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx)
{
Owned<IKeyIndex> keyIndex = createPartKeyIndex(partNo, copy, false);
Owned<IKeyIndex> keyIndex = createPartKeyIndex(partNo, copy);
return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false);
}
const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz)
Expand Down

0 comments on commit aa52969

Please sign in to comment.