Skip to content

Commit

Permalink
blocks again
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Oct 21, 2024
1 parent 2fda71a commit a7a45cf
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
final ExecutorService pool = CommonThreadPool.get(numThreads);
try {
if(splits.length == 1){
new ReadRowsTask(splits[0], informat, job, dest, 0, true).call();
new ReadRowsTask(splits[0], informat, job, dest, 0, true, 0).call();
return;
}

Expand All @@ -80,10 +80,10 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
ArrayList<Future<Object>> tasks2 = new ArrayList<>();
for( int i=0; i<splits.length -1; i++ ){
long tmp = cret.get(i).get();
tasks2.add(pool.submit(new ReadRowsTask(splits[i], informat, job, dest, (int) offset, i==0)));
tasks2.add(pool.submit(new ReadRowsTask(splits[i], informat, job, dest, (int) offset, i==0, i)));
offset += tmp;
}
tasks2.add(pool.submit(new ReadRowsTask(splits[splits.length-1], informat, job, dest, (int) offset, splits.length==1)));
tasks2.add(pool.submit(new ReadRowsTask(splits[splits.length-1], informat, job, dest, (int) offset, splits.length==1, splits.length-1)));

//read individual splits
for(Future<Object> a : tasks2)
Expand Down Expand Up @@ -168,25 +168,27 @@ private class ReadRowsTask implements Callable<Object>
private FrameBlock _dest = null;
private int _offset = -1;
private boolean _isFirstSplit = false;
private int _id;


public ReadRowsTask(InputSplit split, TextInputFormat informat, JobConf job,
FrameBlock dest, int offset, boolean first)
FrameBlock dest, int offset, boolean first, int id)
{
_split = split;
_informat = informat;
_job = job;
_dest = dest;
_offset = offset;
_isFirstSplit = first;
_id = id;
}

@Override
public Object call() throws Exception {
LOG.debug("read csv start : " + _offset);
LOG.debug("read csv start : " + _id + "---" + _offset);
readCSVFrameFromInputSplit(_split, _informat, _job, _dest, _dest.getSchema(),
_dest.getColumnNames(), _dest.getNumRows(), _dest.getNumColumns(), _offset, _isFirstSplit);
LOG.debug("read csv end : " + _offset);
LOG.debug("read csv end : " + _id + "---" + _offset);
return null;
}
}
Expand Down

0 comments on commit a7a45cf

Please sign in to comment.