Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32833 Reuse peerEP from accept when creating socket #19214

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion roxie/udplib/udpsha.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class CSocketSimulator : public CInterfaceOf<ISocket>
virtual size32_t writetms(void const* buf, size32_t minSize, size32_t size, unsigned timeoutms=WAIT_FOREVER) override { UNIMPLEMENTED; }

virtual size32_t get_max_send_size() override { UNIMPLEMENTED; }
virtual ISocket* accept(bool allowcancel=false, SocketEndpoint *peerEp = nullptr) override { UNIMPLEMENTED; }
virtual ISocket* accept(bool allowcancel=false) override { UNIMPLEMENTED; }
virtual int logPollError(unsigned revents, const char *rwstr) override { UNIMPLEMENTED; }
virtual int wait_read(unsigned timeout) override { UNIMPLEMENTED; }
virtual int wait_write(unsigned timeout) override { UNIMPLEMENTED; }
Expand Down
36 changes: 23 additions & 13 deletions system/jlib/jsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ class CSocket: public ISocket, public CInterface
void shutdown(unsigned mode=SHUTDOWN_READWRITE);
void shutdownNoThrow(unsigned mode);

ISocket* accept(bool allowcancel, SocketEndpoint *peerEp=nullptr);
ISocket* accept(bool allowcancel);
int wait_read(unsigned timeout);
int logPollError(unsigned revents, const char *rwstr);
int wait_write(unsigned timeout);
Expand Down Expand Up @@ -703,7 +703,7 @@ class CSocket: public ISocket, public CInterface
void setTraceName();

CSocket(const SocketEndpoint &_ep,SOCKETMODE smode,const char *name);
CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned);
CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned,SocketEndpoint *_peerEp);

virtual ~CSocket();

Expand Down Expand Up @@ -1254,7 +1254,7 @@ void CSocket::open(int listen_queue_size,bool reuseports)



ISocket* CSocket::accept(bool allowcancel, SocketEndpoint *peerEp)
ISocket* CSocket::accept(bool allowcancel)
{
if ((accept_cancel_state!=accept_not_cancelled) && allowcancel) {
accept_cancel_state=accept_cancelled;
Expand Down Expand Up @@ -1334,10 +1334,9 @@ ISocket* CSocket::accept(bool allowcancel, SocketEndpoint *peerEp)
THROWJSOCKTARGETEXCEPTION(JSOCKERR_cancel_accept);
}

if (peerEp)
getSockAddrEndpoint(peerSockAddr, peerSockAddrLen, *peerEp);

CSocket *ret = new CSocket(newsock,sm_tcp,true);
SocketEndpoint peerEp;
getSockAddrEndpoint(peerSockAddr, peerSockAddrLen, peerEp);
CSocket *ret = new CSocket(newsock,sm_tcp,true,&peerEp);
ret->checkCfgKeepAlive();
ret->set_inherit(false);
ret->set_nonblock(true);
Expand Down Expand Up @@ -3023,7 +3022,7 @@ CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name)
#endif
}

CSocket::CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned)
CSocket::CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned,SocketEndpoint *_peerEp)
{
nonblocking = false;
#ifdef USERECVSEM
Expand All @@ -3045,13 +3044,24 @@ CSocket::CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned)
accept_cancel_state = accept_not_cancelled;
set_nagle(false);
//set_linger(DEFAULT_LINGER_TIME); -- experiment with removing this as closesocket should still endevour to send outstanding data
char peer[256];
hostport = peer_name(peer,sizeof(peer));
targetip.ipset(peer);
if (_peerEp)
{
targetip = *_peerEp;
hostport = _peerEp->port;
}
else
{
char peer[256];
hostport = peer_name(peer,sizeof(peer));
targetip.ipset(peer);
}
SocketEndpoint ep;
localPort = getEndpoint(ep).port;
#ifdef _TRACE
setTraceName("A!", peer);
StringBuffer tmp;
targetip.getIpText(tmp);
tmp.append(":").append(hostport);
setTraceName("A!", tmp);
#endif
}

Expand Down Expand Up @@ -3148,7 +3158,7 @@ ISocket* ISocket::multicast_connect(const SocketEndpoint &ep, unsigned _ttl)

ISocket* ISocket::attach(int s, bool tcpip)
{
CSocket* sock = new CSocket((SOCKET)s, tcpip?sm_tcp:sm_udp, false);
CSocket* sock = new CSocket((SOCKET)s, tcpip?sm_tcp:sm_udp, false, nullptr);
sock->set_nonblock(true);
return sock;
}
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jsocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class jlib_decl ISocket : extends IInterface
//
// This method is called by server to accept client connection
//
virtual ISocket* accept(bool allowcancel=false, SocketEndpoint *peerEp = nullptr) = 0; // not needed for UDP
virtual ISocket* accept(bool allowcancel=false) = 0; // not needed for UDP

//
// log poll() errors
Expand Down
18 changes: 9 additions & 9 deletions system/mp/mpcomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ struct MultiPacketHeader

class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
{
StringAttr msg;
public:
IMPLEMENT_IINTERFACE;

Expand Down Expand Up @@ -254,6 +253,7 @@ class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
private:
MessagePassingError error;
SocketEndpoint endpoint;
StringAttr msg;
};


Expand Down Expand Up @@ -537,19 +537,20 @@ class CMPConnectThread: public Thread
{
CConnectSelectHandler &selectHandler;
Owned<ISocket> sock;
SocketEndpoint peerEP;
StringBuffer peerHostText, peerEndpointText;
ConnectHdr hdr;
cycle_t createTime = 0;
size32_t readSoFar = 0;
CriticalSection crit;
bool closedOrHandled = false;
public:
CSocketHandler(CConnectSelectHandler &_selectHandler, ISocket *_sock, const SocketEndpoint &_peerEP) : selectHandler(_selectHandler), sock(_sock), peerEP(_peerEP)
CSocketHandler(CConnectSelectHandler &_selectHandler, ISocket *_sock) : selectHandler(_selectHandler), sock(_sock)
{
createTime = get_cycles_now();
SocketEndpoint peerEP;
sock->getPeerEndpoint(peerEP);
peerEP.getHostText(peerHostText); // always used by handleAcceptedSocket
peerEndpointText.append(peerEndpointText); // only used if tracing an error
peerEndpointText.append(peerHostText); // only used if tracing an error
if (peerEP.port)
peerEndpointText.append(':').append(peerEP.port);
}
Expand Down Expand Up @@ -692,7 +693,7 @@ class CMPConnectThread: public Thread
maintenanceSem.signal();
maintenanceThread.join();
}
void add(ISocket *sock, const SocketEndpoint &peerEP)
void add(ISocket *sock)
{
while (true)
{
Expand All @@ -707,7 +708,7 @@ class CMPConnectThread: public Thread
MilliSleep(1000);
}

Owned<CSocketHandler> socketHandler = new CSocketHandler(*this, LINK(sock), peerEP);
Owned<CSocketHandler> socketHandler = new CSocketHandler(*this, LINK(sock));

size_t numHandlers;
{
Expand Down Expand Up @@ -2569,10 +2570,9 @@ int CMPConnectThread::run()
while (running)
{
Owned<ISocket> sock;
SocketEndpoint peerEP;
try
{
sock.setown(listensock->accept(true, &peerEP));
sock.setown(listensock->accept(true));
}
catch (IException *e)
{
Expand Down Expand Up @@ -2611,7 +2611,7 @@ int CMPConnectThread::run()
// After that, the socket will be removed from the connectSelectHamndler,
// a CMPChannel will be estalbished, and the socket will be added to the MP CMPPacketReader select handler.
// See handleAcceptedSocket.
connectSelectHandler.add(sock, peerEP);
connectSelectHandler.add(sock);
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion system/security/securesocket/securesocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class CSecureSocket : implements ISecureSocket, public CInterface
//
// This method is called by server to accept client connection
//
virtual ISocket* accept(bool allowcancel=false, SocketEndpoint *peerEp=nullptr) // not needed for UDP
virtual ISocket* accept(bool allowcancel=false) // not needed for UDP
{
throw MakeStringException(-1, "CSecureSocket::accept: not implemented");
}
Expand Down
Loading