diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java index a9aee5a9903..de9ce644a6d 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.MetadataUtils; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions; +import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator; import org.apache.drill.exec.store.easy.json.parser.TokenIterator; import org.apache.drill.exec.store.kafka.KafkaStoragePlugin; import org.apache.drill.exec.store.kafka.MetaDataField; @@ -37,8 +38,6 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.util.Iterator; import java.util.Properties; import java.util.StringJoiner; @@ -50,7 +49,7 @@ public class JsonMessageReader implements MessageReader { private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class); - private final SingleElementIterator stream = new SingleElementIterator<>(); + private final ClosingStreamIterator stream = new ClosingStreamIterator(); private KafkaJsonLoader kafkaJsonLoader; private ResultSetLoader resultSetLoader; @@ -156,24 +155,4 @@ public String toString() { .add("resultSetLoader=" + resultSetLoader) .toString(); } - - public static class SingleElementIterator implements Iterator { - private T value; - - @Override - public boolean hasNext() { - return value != null; - } - - @Override - public T next() { - T value = this.value; - this.value = null; - return value; - } - - public void setValue(T value) { - this.value = value; - } - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java new file mode 100644 index 00000000000..0b67049e573 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.json.loader; + +import java.io.InputStream; + +import org.apache.drill.common.AutoCloseables; + +import java.util.Iterator; + +/** + * It allows setting the current value in the iterator and can be used once after {@link #next} call + * + * @param type of the value + */ +public class ClosingStreamIterator implements Iterator { + private InputStream value, last; + + @Override + public boolean hasNext() { + if (value == null) { + AutoCloseables.closeSilently(last); + return false; + } + return true; + } + + @Override + public InputStream next() { + this.last = this.value; + this.value = null; + return this.last; + } + + public void setValue(InputStream value) { + AutoCloseables.closeSilently(this.value); + this.value = value; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java index edc687f4aa9..2886815db8f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.CustomErrorContext; import org.apache.drill.common.exceptions.EmptyErrorContext; import org.apache.drill.common.exceptions.UserException; @@ -232,6 +233,7 @@ interface NullTypeMarker { private final JsonStructureParser parser; private final FieldFactory fieldFactory; private final ImplicitColumns implicitFields; + private final Iterable streams; private final int maxRows; private boolean eof; @@ -254,6 +256,7 @@ protected JsonLoaderImpl(JsonLoaderBuilder builder) { this.implicitFields = builder.implicitFields; this.maxRows = builder.maxRows; this.fieldFactory = buildFieldFactory(builder); + this.streams = builder.streams; this.parser = buildParser(builder); } @@ -354,6 +357,13 @@ protected void endBatch() { @Override // JsonLoader public void close() { parser.close(); + for (InputStream stream: streams) { + try { + AutoCloseables.close(stream); + } catch (Exception ex) { + logger.warn("Failed to close an input stream, a system resource leak may ensue.", ex); + } + } } @Override // ErrorFactory