Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32822 Fix MP protocol error logged indefinitely in loop #19207

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion system/jlib/jdebug.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class PeriodicTimer

protected:
cycle_t timePeriodCycles = 0;
cycle_t lastElapsedCycles = 0;
std::atomic<cycle_t> lastElapsedCycles{0};
};


Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jmisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ void throwExceptionIfAborting()

StringBuffer & hexdump2string(byte const * in, size32_t inSize, StringBuffer & out)
{
out.append("[");
out.appendf("%u bytes [", inSize);
byte last = 0;
unsigned seq = 1;
for(unsigned i=0; i<inSize; ++i)
Expand Down
40 changes: 28 additions & 12 deletions system/mp/mpcomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ struct MultiPacketHeader

class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
{
StringAttr msg;
public:
IMPLEMENT_IINTERFACE;

CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
CMPException(MessagePassingError err,const SocketEndpoint &ep, const char *_msg = nullptr) : error(err), endpoint(ep), msg(_msg)
{
}

Expand All @@ -240,6 +241,8 @@ class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
// change it from "MP link closed" to something more helpful
case MPERR_link_closed: str.appendf("Unexpected process termination (ep:%s)",endpoint.getEndpointHostText(tmp).str()); break;
}
if (msg.length())
str.append(" - ").append(msg);
return str;
}
int errorCode() const { return error; }
Expand Down Expand Up @@ -1813,6 +1816,8 @@ class ForwardPacketHandler // TAG_SYS_FORWARD
};


static PeriodicTimer periodicTimer(10*60*1000, false); // 10 minutes
static std::atomic<__uint64> mpProtocolErrors{0};
// --------------------------------------------------------

class CMPPacketReader: public ISocketSelectNotify, public CInterface
Expand Down Expand Up @@ -1847,7 +1852,8 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
{
if (!parent)
return false;
bool gc = false; // if a gc is hit, then will fall through to close socket
bool closeSocket = false; // if a graceful close is hit, this will be set and will fall through to close socket
bool suppressException = false;
try
{
while (true) // NB: breaks out if blocked (if (remaining) ..)
Expand All @@ -1872,7 +1878,7 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
if (!gotPacketHdr)
{
CCycleTimer timer;
gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000));
closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000));
remaining -= szRead;
activeptr += szRead;
if (remaining) // only possible if blocked.
Expand All @@ -1882,10 +1888,20 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100)
{
// TBD IPV6 here
mpProtocolErrors++;
SocketEndpoint ep;
sock->getPeerEndpoint(ep);
IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
throw e;
if (periodicTimer.hasElapsed())
{
VStringBuffer packetHdrBytes("[%" I64F "u incidents to date]. Packet Header: ", mpProtocolErrors.load());
hexdump2string((byte const *)&hdr, sizeof(hdr), packetHdrBytes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be std::min(packetHdrBytes, sizeRead)?

Also, should the number of bytes be logged in the message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is guaranteed to have read exactly sizeof(hdr) at this point, if it was less remaining would be >0 and it wouldn't reach here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, should the number of bytes be logged in the message?

could make hexdump2string prefix message with # bytes..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have changed hexdump2string to include # bytes

throw new CMPException(MPERR_protocol_version_mismatch, ep, packetHdrBytes.str());
}
else
{
suppressException = true;
throw new CMPException(MPERR_protocol_version_mismatch, ep);
}
}
hdr.setMessageFields(*activemsg);
#ifdef _FULLTRACE
Expand All @@ -1898,9 +1914,9 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
gotPacketHdr = true;
}

if (!gc && remaining)
if (!closeSocket && remaining)
{
gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER);
closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER);
remaining -= szRead;
activeptr += szRead;
}
Expand Down Expand Up @@ -1939,19 +1955,19 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
}
}
while (activemsg);
if (gc)
if (closeSocket)
break;
}
}
catch (IException *e)
{
if (e->errorCode()!=JSOCKERR_graceful_close)
FLLOG(MCoperatorWarning, e,"MP(Packet Reader)");
if (!suppressException && e->errorCode()!=JSOCKERR_graceful_close)
FLLOG(MCoperatorWarning, e, "MP(Packet Reader)");
e->Release();
gotPacketHdr = false;
closeSocket = true; // NB: this select handler will removed and not be notified again
}

if (gc)
if (closeSocket)
{
// here due to error or graceful close, so close socket (ignore error as may be closed already)
try
Expand Down
Loading