diff --git a/src/main/java/org/logstash/beats/V2Batch.java b/src/main/java/org/logstash/beats/V2Batch.java index ffc03374..57d6624e 100644 --- a/src/main/java/org/logstash/beats/V2Batch.java +++ b/src/main/java/org/logstash/beats/V2Batch.java @@ -1,10 +1,12 @@ package org.logstash.beats; +import java.io.Closeable; +import java.util.Iterator; +import java.util.NoSuchElementException; + import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; -import java.util.Iterator; - /** * Implementation of {@link Batch} for the v2 protocol backed by ByteBuf. *must* be released after use. */ @@ -26,9 +28,13 @@ public byte getProtocol() { return Protocol.VERSION_2; } - public Iterator iterator(){ - internalBuffer.resetReaderIndex(); + public Iterator iterator() { return new Iterator() { + private int read = 0; + private ByteBuf readerBuffer = internalBuffer.asReadOnly(); + { + readerBuffer.resetReaderIndex(); + } @Override public boolean hasNext() { return read < written; @@ -36,10 +42,13 @@ public boolean hasNext() { @Override public Message next() { - int sequenceNumber = internalBuffer.readInt(); - int readableBytes = internalBuffer.readInt(); - Message message = new Message(sequenceNumber, internalBuffer.slice(internalBuffer.readerIndex(), readableBytes)); - internalBuffer.readerIndex(internalBuffer.readerIndex() + readableBytes); + if (read >= written) { + throw new NoSuchElementException(); + } + int sequenceNumber = readerBuffer.readInt(); + int readableBytes = readerBuffer.readInt(); + Message message = new Message(sequenceNumber, readerBuffer.slice(readerBuffer.readerIndex(), readableBytes)); + readerBuffer.readerIndex(readerBuffer.readerIndex() + readableBytes); message.setBatch(V2Batch.this); read++; return message; @@ -92,4 +101,10 @@ void addMessage(int sequenceNumber, ByteBuf buffer, int size) { public void release() { internalBuffer.release(); } + + @Override + public void close() { + release(); + } + } diff --git a/src/test/java/org/logstash/beats/V2BatchTest.java b/src/test/java/org/logstash/beats/V2BatchTest.java index 45a668a1..4948e97e 100644 --- a/src/test/java/org/logstash/beats/V2BatchTest.java +++ b/src/test/java/org/logstash/beats/V2BatchTest.java @@ -19,48 +19,49 @@ public class V2BatchTest { @Test public void testIsEmpty() { - V2Batch batch = new V2Batch(); - assertTrue(batch.isEmpty()); - ByteBuf content = messageContents(); - batch.addMessage(1, content, content.readableBytes()); - assertFalse(batch.isEmpty()); + try (V2Batch batch = new V2Batch()){ + assertTrue(batch.isEmpty()); + ByteBuf content = messageContents(); + batch.addMessage(1, content, content.readableBytes()); + assertFalse(batch.isEmpty()); + } } @Test public void testSize() { - V2Batch batch = new V2Batch(); - assertEquals(0, batch.size()); - ByteBuf content = messageContents(); - batch.addMessage(1, content, content.readableBytes()); - assertEquals(1, batch.size()); + try (V2Batch batch = new V2Batch()) { + assertEquals(0, batch.size()); + ByteBuf content = messageContents(); + batch.addMessage(1, content, content.readableBytes()); + assertEquals(1, batch.size()); + } } @Test - public void TestGetProtocol() { - assertEquals(Protocol.VERSION_2, new V2Batch().getProtocol()); + public void testGetProtocol() { + try (V2Batch batch = new V2Batch()) { + assertEquals(Protocol.VERSION_2, batch.getProtocol()); + } } @Test - public void TestCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() { - V2Batch batch = new V2Batch(); - int numberOfEvent = 2; - - batch.setBatchSize(numberOfEvent); - - for(int i = 1; i <= numberOfEvent; i++) { - ByteBuf content = messageContents(); - batch.addMessage(i, content, content.readableBytes()); + public void testCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() { + try (V2Batch batch = new V2Batch()) { + int numberOfEvent = 2; + batch.setBatchSize(numberOfEvent); + for (int i = 1; i <= numberOfEvent; i++) { + ByteBuf content = messageContents(); + batch.addMessage(i, content, content.readableBytes()); + } + assertTrue(batch.isComplete()); } - - assertTrue(batch.isComplete()); } @Test public void testBigBatch() { - V2Batch batch = new V2Batch(); - int size = 4096; - assertEquals(0, batch.size()); - try { + try (V2Batch batch = new V2Batch()) { + int size = 4096; + assertEquals(0, batch.size()); ByteBuf content = messageContents(); for (int i = 0; i < size; i++) { batch.addMessage(i, content, content.readableBytes()); @@ -70,22 +71,19 @@ public void testBigBatch() { for (Message message : batch) { assertEquals(message.getSequence(), i++); } - }finally { - batch.release(); } - } + } @Test - public void TestCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() { - V2Batch batch = new V2Batch(); - int numberOfEvent = 2; - - batch.setBatchSize(numberOfEvent); - ByteBuf content = messageContents(); - batch.addMessage(1, content, content.readableBytes()); - - assertFalse(batch.isComplete()); + public void testCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() { + try (V2Batch batch = new V2Batch()) { + int numberOfEvent = 2; + batch.setBatchSize(numberOfEvent); + ByteBuf content = messageContents(); + batch.addMessage(1, content, content.readableBytes()); + assertFalse(batch.isComplete()); + } } public static ByteBuf messageContents() { @@ -98,4 +96,4 @@ public static ByteBuf messageContents() { throw new RuntimeException(e); } } -} \ No newline at end of file +}