Skip to content

Commit

Permalink
HPCC-32845 Guard against KJ reading TLKs as regular index parts
Browse files Browse the repository at this point in the history
Check that the last part in an index/subindex is a TLK if the
meta data is missing.
Also remove some redundant 'delayed' functionality.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Oct 28, 2024
1 parent 1e4ca26 commit 8b86f78
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 47 deletions.
3 changes: 2 additions & 1 deletion thorlcr/activities/hashdistrib/thhashdistrib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "thorport.hpp"
#include "thbufdef.hpp"
#include "thexception.hpp"
#include "thormisc.hpp"

#define NUMINPARALLEL 16

Expand Down Expand Up @@ -115,7 +116,7 @@ class IndexDistributeActivityMaster : public HashDistributeMasterBase
checkFormatCrc(this, file, helper->getFormatCrc(), nullptr, helper->getFormatCrc(), nullptr, true);
Owned<IFileDescriptor> fileDesc = file->getFileDescriptor();
Owned<IPartDescriptor> tlkDesc = fileDesc->getPart(fileDesc->numParts()-1);
if (!tlkDesc->queryProperties().hasProp("@kind") || 0 != stricmp("topLevelKey", tlkDesc->queryProperties().queryProp("@kind")))
if (!hasTLK(*file, this))
throw MakeActivityException(this, 0, "Cannot distribute using a non-distributed key: '%s'", scoped.str());
unsigned location;
OwnedIFile iFile;
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/activities/indexwrite/thindexwrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ class IndexWriteActivityMaster : public CMasterActivity
checkFormatCrc(this, _f, helper->getFormatCrc(), nullptr, helper->getFormatCrc(), nullptr, true);
IDistributedFile *f = _f->querySuperFile();
if (!f) f = _f;
Owned<IDistributedFilePart> existingTlk = f->getPart(f->numParts()-1);
if (!existingTlk->queryAttributes().hasProp("@kind") || 0 != stricmp("topLevelKey", existingTlk->queryAttributes().queryProp("@kind")))
if (!hasTLK(*f, this))
throw MakeActivityException(this, 0, "Cannot build new key '%s' based on non-distributed key '%s'", fileName.get(), diName.get());
Owned<IDistributedFilePart> existingTlk = f->getPart(f->numParts()-1);
IPartDescriptor *tlkDesc = fileDesc->queryPart(fileDesc->numParts()-1);
IPropertyTree &props = tlkDesc->queryProperties();
if (existingTlk->queryAttributes().hasProp("@size"))
Expand Down
5 changes: 1 addition & 4 deletions thorlcr/activities/keydiff/thkeydiff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ class CKeyDiffMaster : public CMasterActivity

originalDesc.setown(originalIndexFile->getFileDescriptor());
newIndexDesc.setown(newIndexFile->getFileDescriptor());
Owned<IPartDescriptor> tlkDesc = originalDesc->getPart(originalDesc->numParts()-1);
const char *kind = tlkDesc->queryProperties().queryProp("@kind");
local = NULL == kind || 0 != stricmp("topLevelKey", kind);

local = !hasTLK(*originalIndexFile, this);
if (!local)
width--; // 1 part == No n distributed / Monolithic key
if (width > container.queryJob().querySlaves())
Expand Down
31 changes: 14 additions & 17 deletions thorlcr/activities/keyedjoin/thkeyedjoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,21 @@ class CKeyedJoinMaster : public CMasterActivity
unsigned numParts = fileDesc->numParts();
unsigned nextGroupStartPos = 0;

IDistributedFile *subFile = file;
for (unsigned p=0; p<numParts; p++)
{
IPartDescriptor *part = fileDesc->queryPart(p);
const char *kind = isIndexWithTlk ? part->queryProperties().queryProp("@kind") : nullptr;
if (!kind || !strsame("topLevelKey", kind))
unsigned partIdx = part->queryPartIndex();
unsigned subFileNum = NotFound;
unsigned subPartIdx = partIdx;
if (superFileDesc)
{
superFileDesc->mapSubPart(partIdx, subFileNum, subPartIdx);
partIdx = superWidth*subFileNum+subPartIdx;
subFile = &super->querySubFile(subFileNum, true);
}
if ((1 == numParts) || (subPartIdx < (subFile->numParts()-1)) || !hasTLK(*subFile, nullptr))
{
unsigned partIdx = part->queryPartIndex();
unsigned subfile = NotFound;
unsigned subPartIdx = partIdx;
if (superFileDesc)
{
superFileDesc->mapSubPart(partIdx, subfile, subPartIdx);
partIdx = superWidth*subfile+subPartIdx;
}
if (activity.local)
{
if (activity.queryContainer().queryLocalData())
Expand Down Expand Up @@ -234,7 +235,7 @@ class CKeyedJoinMaster : public CMasterActivity
slaveParts.push_back(p);
}
if (superFileDesc)
partIdx = superWidth*subfile+subPartIdx;
partIdx = superWidth*subFileNum+subPartIdx;
partsByPartIdx.push_back(partIdx);
assertex(partIdx < totalParts);
partToSlave[partIdx] = mappedPos;
Expand Down Expand Up @@ -387,10 +388,7 @@ class CKeyedJoinMaster : public CMasterActivity
ForEach(*iter)
{
IDistributedFile &f = iter->query();
unsigned np = f.numParts()-1;
IDistributedFilePart &part = f.queryPart(np);
const char *kind = part.queryAttributes().queryProp("@kind");
bool hasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind); // if last part not tlk, then deemed local (might be singlePartKey)
bool hasTlk = hasTLK(f, this);
if (first)
{
first = false;
Expand Down Expand Up @@ -419,8 +417,7 @@ class CKeyedJoinMaster : public CMasterActivity
totalIndexParts = indexFile->numParts();
if (totalIndexParts)
{
const char *kind = indexFile->queryPart(indexFile->numParts()-1).queryAttributes().queryProp("@kind");
keyHasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind);
keyHasTlk = hasTLK(*indexFile, this);
if (keyHasTlk)
--totalIndexParts;
}
Expand Down
27 changes: 9 additions & 18 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
{
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> partBits;
Owned<IKeyIndex> keyIndex = activity.createPartKeyIndex(partNo, copy, false);
Owned<IKeyIndex> keyIndex = activity.createPartKeyIndex(partNo, copy);
partKeySet->addIndex(keyIndex.getClear());
}
keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, &contextLogger, helper->hasNewSegmentMonitors(), false);
Expand Down Expand Up @@ -2454,7 +2454,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
return tlkKeyIndexes.ordinality();
}
IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy, bool delayed)
IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy)
{
IPartDescriptor &filePart = allIndexParts.item(partNo);
unsigned crc=0;
Expand All @@ -2464,25 +2464,16 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
StringBuffer filename;
rfn.getPath(filename);

if (delayed)
{
Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr);
Owned<IDelayedFile> delayedFile = createDelayedFile(lazyIFileIO);
return createKeyIndex(filename, crc, *delayedFile, (unsigned) -1, false, 0);
}
else
{
/* NB: createKeyIndex here, will load the key immediately
* But that's okay, because we are only here on demand.
* The underlying IFileIO can later be closed by fhe file caching mechanism.
*/
Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr);
return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false, 0);
}
/* NB: createKeyIndex here, will load the key immediately
* But that's okay, because we are only here on demand.
* The underlying IFileIO can later be closed by fhe file caching mechanism.
*/
Owned<IFileIO> lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr);
return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false, 0);
}
IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx)
{
Owned<IKeyIndex> keyIndex = createPartKeyIndex(partNo, copy, false);
Owned<IKeyIndex> keyIndex = createPartKeyIndex(partNo, copy);
return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false);
}
const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz)
Expand Down
6 changes: 1 addition & 5 deletions thorlcr/activities/keypatch/thkeypatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ class CKeyPatchMaster : public CMasterActivity

originalDesc.setown(originalIndexFile->getFileDescriptor());
patchDesc.setown(patchFile->getFileDescriptor());

Owned<IPartDescriptor> tlkDesc = originalDesc->getPart(originalDesc->numParts()-1);
const char *kind = tlkDesc->queryProperties().queryProp("@kind");
local = NULL == kind || 0 != stricmp("topLevelKey", kind);

local = !hasTLK(*originalIndexFile, this);
if (!local && width > 1)
width--; // 1 part == No n distributed / Monolithic key
if (width > container.queryJob().querySlaves())
Expand Down
35 changes: 35 additions & 0 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "jsocket.hpp"
#include "jmutex.hpp"

#include "jhtree.hpp"

#include "commonext.hpp"
#include "dadfs.hpp"
#include "dasds.hpp"
Expand Down Expand Up @@ -1703,3 +1705,36 @@ void saveWuidToFile(const char *wuid)
wuidFileIO->write(0, strlen(wuid), wuid);
wuidFileIO->close();
}

bool hasTLK(IDistributedFile &file, CActivityBase *activity)
{
unsigned np = file.numParts();
IDistributedFilePart &part = file.queryPart(np-1);
bool keyHasTlk = strisame("topLevelKey", part.queryAttributes().queryProp("@kind"));
if (!keyHasTlk)
{
// See HPCC-32845 - check if TLK flag is missing from TLK part
// It is very likely the last part should be a TLK. Even a local key (>1 parts) has a TLK by default (see buildLocalTlks)
if (np>1)
{
RemoteFilename rfn;
part.getFilename(rfn);
StringBuffer filename;
rfn.getPath(filename);
Owned<IKeyIndex> index = createKeyIndex(filename, 0, false, 0);
dbgassertex(index);
if (index->isTopLevelKey())
{
if (activity)
{
Owned<IException> e = MakeActivityException(activity, 0, "TLK file part of file %s is missing kind=\"topLevelKey\" flag. The meta data should be fixed!", file.queryLogicalName());
reportExceptionToWorkunitCheckIgnore(activity->queryJob().queryWorkUnit(), e, SeverityWarning);
StringBuffer errMsg;
UWARNLOG("%s", e->errorMessage(errMsg).str());
}
keyHasTlk = true;
}
}
}
return keyHasTlk;
}
2 changes: 2 additions & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,4 +723,6 @@ class graph_decl CThorPerfTracer : protected PerfTracer

extern graph_decl void saveWuidToFile(const char *wuid);

extern graph_decl bool hasTLK(IDistributedFile &file, CActivityBase *activity);

#endif

0 comments on commit 8b86f78

Please sign in to comment.