Skip to content

Commit

Permalink
changes done for adding streaming support in XLSX module.
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasrathee-cs committed Sep 26, 2024
1 parent 12d4727 commit 690eac4
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 20 deletions.
26 changes: 22 additions & 4 deletions format-xls/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<name>XLS format plugins</name>
<packaging>jar</packaging>
<properties>
<poi.version>5.2.4</poi.version>
<poi.version>5.2.5</poi.version>
<log4j-core.version>2.20.0</log4j-core.version>
</properties>

Expand All @@ -45,7 +45,7 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>compile</scope>
<version>${log4j-core.version}</version>
<version>${log4j-core.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
Expand All @@ -64,12 +64,30 @@
<artifactId>format-common</artifactId>
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>excel-streaming-reader</artifactId>
<version>4.2.1</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>poi-shared-strings</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.cdap.plugin.format.xls.input;


import com.github.pjfanning.xlsx.StreamingReader;
import com.google.common.base.Preconditions;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.format.input.PathTrackingInputFormat;
Expand All @@ -25,17 +28,23 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.poi.EmptyFileException;
import org.apache.poi.poifs.filesystem.FileMagic;
import org.apache.poi.ss.usermodel.FormulaEvaluator;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.util.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import javax.annotation.Nullable;


Expand All @@ -51,11 +60,12 @@ public class XlsInputFormat extends PathTrackingInputFormat {
public static final String SHEET_VALUE = "sheetValue";
public static final String NAME_SKIP_HEADER = "skipHeader";
public static final String TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow";
private static final int EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT = Integer.MAX_VALUE / 2;

@Override
protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReader(
FileSplit split, TaskAttemptContext context, @Nullable String pathField,
@Nullable Schema schema) throws IOException {
FileSplit split, TaskAttemptContext context, @Nullable String pathField,
@Nullable Schema schema) throws IOException {
Configuration jobConf = context.getConfiguration();
boolean skipFirstRow = jobConf.getBoolean(NAME_SKIP_HEADER, false);
boolean terminateIfEmptyRow = jobConf.getBoolean(TERMINATE_IF_EMPTY_ROW, false);
Expand All @@ -65,6 +75,10 @@ protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReade
return new XlsRecordReader(sheet, sheetValue, outputSchema, terminateIfEmptyRow, skipFirstRow);
}

public boolean isSplitable(JobContext context, Path file) {
return false;
}

/**
* Reads Excel sheet, where each row is a {@link StructuredRecord} and each cell is a field in the record.
*/
Expand All @@ -74,11 +88,7 @@ public static class XlsRecordReader extends RecordReader<NullWritable, Structure
FormulaEvaluator formulaEvaluator;
// Builder for building structured record
private StructuredRecord.Builder valueBuilder;
private Sheet workSheet;
// InputStream handler for Excel files.
private FSDataInputStream fileIn;
// Specifies the row index.
private int rowIndex;
// Specifies last row num.
private int lastRowNum;
private boolean isRowNull;
Expand All @@ -87,6 +97,11 @@ public static class XlsRecordReader extends RecordReader<NullWritable, Structure
private final Schema outputSchema;
private final boolean terminateIfEmptyRow;
private final boolean skipFirstRow;
private int rowCount;
private Iterator<Row> rows;
// Specifies the row index.
private long rowIdx;


/**
* Constructor for XlsRecordReader.
Expand All @@ -113,8 +128,32 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(jobConf);
fileIn = fs.open(file);

try (Workbook workbook = WorkbookFactory.create(fileIn)) {
Sheet workSheet;
Workbook workbook;
boolean isStreaming = false;
try {
// Use Magic Bytes to detect the file type
InputStream is = FileMagic.prepareToCheckMagic(fileIn);
byte[] emptyFileCheck = new byte[1];
is.mark(emptyFileCheck.length);
if (is.read(emptyFileCheck) < emptyFileCheck.length) {
throw new EmptyFileException();
}
is.reset();

final FileMagic fm = FileMagic.valueOf(is);
switch (fm) {
case OOXML:
workbook = StreamingReader.builder().rowCacheSize(10).open(is);
isStreaming = true;
break;
case OLE2:
IOUtils.setByteArrayMaxOverride(EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT);
workbook = WorkbookFactory.create(is);
break;
default:
throw new IOException("Can't open workbook - unsupported file type: " + fm);
}
formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
formulaEvaluator.setIgnoreMissingWorkbooks(true);
// Check if user wants to access with name or number
Expand All @@ -127,37 +166,43 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
} catch (Exception e) {
throw new IOException("Exception while reading excel sheet. " + e.getMessage(), e);
}

// As we cannot get the number of rows in a sheet while streaming.
// -1 is used as rowCount to indicate that all rows should be read.
rowCount = isStreaming ? -1 : workSheet.getPhysicalNumberOfRows();
lastRowNum = workSheet.getLastRowNum();
rows = workSheet.iterator();
isRowNull = false;
rowIndex = skipFirstRow ? 1 : 0;
rowIdx = 0;
valueBuilder = StructuredRecord.builder(outputSchema);
if (skipFirstRow) {
Preconditions.checkArgument(rows.hasNext(), "No rows found on sheet %s", sheetValue);
rowIdx = 1;
rows.next();
}
}

@Override
public boolean nextKeyValue() {
// If any is true, then we stop processing.
if (rowIndex > lastRowNum || lastRowNum == -1 || (isRowNull && terminateIfEmptyRow)) {
if (!rows.hasNext() || rowCount == 0 || (isRowNull && terminateIfEmptyRow)) {
return false;
}
// Get the next row.
Row row = workSheet.getRow(rowIndex);
Row row = rows.next();
valueBuilder = rowConverter.convert(row, outputSchema);
if (row == null || valueBuilder == null) {
isRowNull = true;
// set valueBuilder to a new builder with all fields set to null
valueBuilder = StructuredRecord.builder(outputSchema);
}
// if all fields are null, then the row is null
rowIndex++;

rowIdx++;
// Stop processing if the row is null and terminateIfEmptyRow is true.
return !isRowNull || !terminateIfEmptyRow;
}

@Override
public float getProgress() {
return (float) rowIndex / lastRowNum;
return (float) rowIdx / lastRowNum;
}

@Override
Expand Down

0 comments on commit 690eac4

Please sign in to comment.