Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix TcpDecoder memory leak #54

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ public NettyChannel(io.netty.channel.Channel channel, ProtocolConfig config) {
this.config = config;
if (channel != null) {
// can't get the remote address while using udp, so the remoteAddress is null
this.remoteAddress = ((InetSocketAddress) channel.remoteAddress());
this.localAddress = (InetSocketAddress) channel.localAddress();
if (channel.remoteAddress() instanceof InetSocketAddress) {
this.remoteAddress = ((InetSocketAddress) channel.remoteAddress());
}
if (channel.localAddress() instanceof InetSocketAddress) {
this.localAddress = (InetSocketAddress) channel.localAddress();
}
}
// listen for the close event
if (channel != null && channel.closeFuture() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ private void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
}
}
} while (message.isReadable());
} catch (Exception e) {
message.skipBytes(message.readableBytes());
JoeCqupt marked this conversation as resolved.
Show resolved Hide resolved
throw new TransportException("tcp|decode failure", e);
} finally {
NettyChannelManager.removeChannelIfDisconnected(ctx.channel());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.tencent.trpc.transport.netty;

import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.exception.ErrorCode;
import com.tencent.trpc.core.exception.TRpcException;
import com.tencent.trpc.core.exception.TransportException;
import com.tencent.trpc.core.transport.codec.Codec;
import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderException;
import org.junit.Assert;
import org.junit.Test;

import java.nio.charset.StandardCharsets;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;


public class NettyCodecAdapterTest {

@Test
public void testTcpDecodeIllegalPacket1() {
JoeCqupt marked this conversation as resolved.
Show resolved Hide resolved
Codec codec = mock(Codec.class);
doThrow(TRpcException.newFrameException(ErrorCode.TRPC_CLIENT_DECODE_ERR, "the request protocol is not trpc"))
.when(codec).decode(any(), any());


ProtocolConfig protocolConfig = new ProtocolConfig();
// set batchDecoder true
protocolConfig.setBatchDecoder(true);
NettyCodecAdapter nettyCodecAdapter = NettyCodecAdapter.createTcpCodecAdapter(codec, protocolConfig);

ChannelHandler decoder = nettyCodecAdapter.getDecoder();
EmbeddedChannel embeddedChannel = new EmbeddedChannel();
embeddedChannel.pipeline().addLast(decoder);

ByteBuf byteBuf = AbstractByteBufAllocator.DEFAULT.heapBuffer();
byteBuf.writeBytes("testTcpDecodeIllegalPacket1".getBytes(StandardCharsets.UTF_8));

// write illegal packet
EmbeddedChannel tmpEmbeddedChannel = embeddedChannel;
DecoderException decoderException = Assert.assertThrows(DecoderException.class, () -> {
tmpEmbeddedChannel.writeInbound(byteBuf);
});

Assert.assertTrue(decoderException.getCause() instanceof TransportException);
Assert.assertEquals(byteBuf.refCnt(), 0);
}

@Test
public void testTcpDecodeIllegalPacket2() {
Codec codec = mock(Codec.class);
doThrow(TRpcException.newFrameException(ErrorCode.TRPC_CLIENT_DECODE_ERR, "the request protocol is not trpc"))
.when(codec).decode(any(), any());


ProtocolConfig protocolConfig = new ProtocolConfig();
// set batchDecoder false
protocolConfig.setBatchDecoder(false);
NettyCodecAdapter nettyCodecAdapter = NettyCodecAdapter.createTcpCodecAdapter(codec, protocolConfig);

ChannelHandler decoder = nettyCodecAdapter.getDecoder();
EmbeddedChannel embeddedChannel = new EmbeddedChannel();
embeddedChannel.pipeline().addLast(decoder);

ByteBuf byteBuf = AbstractByteBufAllocator.DEFAULT.heapBuffer();
byteBuf.writeBytes("testTcpDecodeIllegalPacket1".getBytes(StandardCharsets.UTF_8));

// write illegal packet
EmbeddedChannel tmpEmbeddedChannel = embeddedChannel;
DecoderException decoderException = Assert.assertThrows(DecoderException.class, () -> {
tmpEmbeddedChannel.writeInbound(byteBuf);
});

Assert.assertTrue(decoderException.getCause() instanceof TransportException);
Assert.assertEquals(byteBuf.refCnt(), 0);
}
}
Loading