Skip to content

MapReduce jobs over WMArchive store

Valentin Kuznetsov edited this page Dec 4, 2016 · 1 revision

This page is obsolete for end-users

We keep it for reference.

How to run MapReduce jobs over WMArchive store

The pydoop package provides convenient way to run MapReduce jobs over files stored on HDFS. Here we'll explain details how to write simple MapReduce job and run it on Hadoop cluster.

First, we need to write mapper/reducer classes following instructions provided on pydoop MapReduce API tutorial page. Below is working example for WMArchive data

# system modules
import os
import sys
import io
import gzip

# pydoop modules
import pydoop.mapreduce.api as api
import pydoop.mapreduce.pipes as pp
from pydoop.avrolib import AvroContext


# avro modules
import avro.schema
import avro.io

# hdfs pydoop modules
import pydoop.hdfs as hdfs

# WMArchive modules

def read(fname):
    "Read data from given file name"
    uri = '/user/valya/test/fwjr_proc.avsc' # HDFS path to fwjr schema file
    schemaData = hdfs.load(uri)
    schema = avro.schema.parse(schemaData)

    out = []
    data = hdfs.load(fname)
    bytes_reader = io.BytesIO(data)

    if  fname.endswith('.gz'):
        # use gzip'ed reader and pass to it BytesIO as file object
        gzip_reader = gzip.GzipFile(fileobj=bytes_reader)
        decoder = avro.io.BinaryDecoder(gzip_reader)
    else:
        # use non-compressed reader
        decoder = avro.io.BinaryDecoder(bytes_reader)

    reader = avro.io.DatumReader(schema)
    while True:
        try:
            rec = reader.read(decoder)
            out.append(rec)
        except:
            break
    # close gzip stream if necessary
    if  fname.endswith('.gz'):
        gzip_reader.close()
    # close bytes stream
    bytes_reader.close()
    return out

class Reader(api.RecordReader):
    def __init__(self, context):
        super(Reader, self).__init__(context)
        fid = context.input_split.filename # context will provide file names
        self.data = read(fid) # read data
        self.idx = 0 # first element

    def next(self):
        "Iterator over our data, the WMArchive data will be provided as list of records"
        if  self.idx >= len(self.data):
            raise StopIteration
        rec = self.data[self.idx] # access to individual record
        self.idx += 1
        key = rec['jobid']
        return key, rec

    def get_progress(self):
        "Mandatory function to implement progress"
        return float(self.idx)/len(self.data)

class Mapper(api.Mapper):
    def map(self, ctx):
        rec = ctx.value
        jid = rec['jobid']
        if jid is not None:
            ctx.emit(jid, rec['fwjr']['task'])

class Reducer(api.Reducer):
    def reduce(self, ctx):
        ctx.emit('', {'jobid': ctx.key, 'task': ctx.values})

def __main__():
    factory = pp.Factory(mapper_class=Mapper, reducer_class=Reducer, record_reader_class=Reader)
    pp.run_task(factory, private_encoding=True)

User will provide three or four classes, the Mapper, the Reducer which will read data from WMArchive records and Reader (and optional Writer) class(es) to read/write records from stored files. The reader must be able to read data from files stored in HDFS and yield data records to Mapper. The Writer class may be used if we want to write back custom data structure from Reducer step.

If we save aforementioned code into mr.py then we can submit a Hadoop job in the following way

#!/bin/bash
source setup.sh
hdfs=hdfs://p01001532965510.cern.ch:9000/user/valya/test
input=$hdfs/data/
output=$hdfs/mrout
hadoop fs -rm -r $output
ifile=mr.py
module=mr
pydoop submit \
    --upload-archive-to-cache pydoop.tgz \
    --upload-archive-to-cache avro.tgz \
    --do-not-use-java-record-reader \
    --log-level DEBUG \
    --num-reducers 1 \
    --upload-file-to-cache $ifile \
    --mrv2 $module $input $output

This bash script contains the following information

  • input defines location of our data on HDFS
  • output defines location for our output from MapReduce job
  • upload-archive-to-cache option allows to upload to Hadoop Distributed cache custom python modules our job may require, for more information see this page and this code example for more details how to build proper tgz files
  • num-reducers defines how many reducer jobs we'd like to run, if you'll skip this option the default is 3
  • upload-file-to-cache defines which file we'll upload to the worker node (mr.py)
  • finally we specify that we run MapReduce version 2 and provide name of our python module and input/output values for our data

I successfully run this job on analytix.cern.ch node which connected to central CERN Hadoop cluster. To do so, I uploaded few data files into /user/valya/test/data area and then run run_mr.sh script as following:

nohup ./run_mr.sh 2>&1 1>& mr.log < /dev/null &

Here is output of this job

DEBUG:PydoopSubmitter:Submitter class: it.crs4.pydoop.mapreduce.pipes.Submitter
DEBUG:PydoopSubmitter:properties after projection: {'mapreduce.job.name': 'pydoop', 'mapreduce.job.reduces': 1, 'mapreduce.map.output.compress': 'true', 'mapreduce.job.cache.archives': 'hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/pydoop.tgz#pydoop,hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/avro.tgz#avro', 'mapreduce.pipes.isjavarecordreader': 'false', 'mapreduce.job.cache.files': 'hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/mr.py#mr.py', 'mapred.create.symlink': 'yes', 'bl.libhdfs.opts': '-Xmx48m', 'mapreduce.pipes.isjavarecordwriter': 'true'}
DEBUG:PydoopSubmitter:remote_wd: hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea
DEBUG:PydoopSubmitter:remote_exe: hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/4235df98-d85a-4f67-81aa-ffd41e8e3304
DEBUG:PydoopSubmitter:remotes: [('file:///afs/cern.ch/work/v/valya/hadoop/mr.py', 'hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/mr.py', 'mr.py'), ('file:///afs/cern.ch/work/v/valya/hadoop/pydoop.tgz', 'hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/pydoop.tgz', 'pydoop'), ('file:///afs/cern.ch/work/v/valya/hadoop/avro.tgz', 'hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/avro.tgz', 'avro')]
DEBUG:PydoopSubmitter:Setting env variable PATH=/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/xgboost/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/vowpal_wabbit/utl/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/vowpal_wabbit/vowpalwabbit/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/R-3.1.2/install_dir/bin/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr//bin:/afs/.cern.ch/cms/slc6_amd64_gcc481/external/gcc/4.8.1/bin::/usr/sue/bin:/usr/lib64/qt-3.3/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin:/afs/cern.ch/user/v/valya/bin:/afs/cern.ch/user/v/valya/workspace/gopath/bin:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/libfm-1.42.src/bin:/afs/cern.ch/user/v/valya/workspace/hadoop/mongodb/bin:/afs/cern.ch/user/v/valya/workspace/hadoop/pydoop/install/bin
DEBUG:PydoopSubmitter:Setting env variable LD_LIBRARY_PATH=:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr/lib64/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr/jvm/java-1.6.0-openjdk-1.6.0.35.x86_64/jre/lib/amd64/server/:/afs/.cern.ch/cms/slc6_amd64_gcc481/external/gcc/4.8.1/lib:/afs/.cern.ch/cms/slc6_amd64_gcc481/external/gcc/4.8.1/lib64
DEBUG:PydoopSubmitter:Setting env variable PYTHONPATH=${PWD}:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr//lib64/python2.7/site-packages/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr//lib/python2.7/site-packages/::/afs/cern.ch/user/v/valya/workspace/hadoop/usr/lib/python2.7/site-packages
DEBUG:PydoopSubmitter:Generated pipes_code:

 #!/bin/bash
""":"
printenv 1>&2
echo PWD=${PWD} 1>&2
echo ls -l; ls -l  1>&2
export HOME="/afs/cern.ch/user/v/valya"
export PATH="/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/xgboost/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/vowpal_wabbit/utl/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/vowpal_wabbit/vowpalwabbit/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/R-3.1.2/install_dir/bin/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr//bin:/afs/.cern.ch/cms/slc6_amd64_gcc481/external/gcc/4.8.1/bin::/usr/sue/bin:/usr/lib64/qt-3.3/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin:/afs/cern.ch/user/v/valya/bin:/afs/cern.ch/user/v/valya/workspace/gopath/bin:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/libfm-1.42.src/bin:/afs/cern.ch/user/v/valya/workspace/hadoop/mongodb/bin:/afs/cern.ch/user/v/valya/workspace/hadoop/pydoop/install/bin"
export LD_LIBRARY_PATH=":/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr/lib64/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr/jvm/java-1.6.0-openjdk-1.6.0.35.x86_64/jre/lib/amd64/server/:/afs/.cern.ch/cms/slc6_amd64_gcc481/external/gcc/4.8.1/lib:/afs/.cern.ch/cms/slc6_amd64_gcc481/external/gcc/4.8.1/lib64"
export PYTHONPATH="${PWD}:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr//lib64/python2.7/site-packages/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr//lib/python2.7/site-packages/::/afs/cern.ch/user/v/valya/workspace/hadoop/usr/lib/python2.7/site-packages"
echo PATH=${PATH} 1>&2
echo LD_LIBRARY_PATH=${LD_LIBRARY_PATH} 1>&2
echo PYTHONPATH=${PYTHONPATH} 1>&2
echo HOME=${HOME} 1>&2
echo "executable is $(type -P python)" 1>&2
echo cmd to execute: exec "python" -u "$0" "$@"
exec "python" -u "$0" "$@"
":"""
import sys
sys.stderr.write("%r\n" % sys.path)
sys.stderr.write("%s\n" % sys.version)
import mr as module
module.__main__()

DEBUG:PydoopSubmitter:created and chmod-ed: hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea
DEBUG:PydoopSubmitter:Setting env variable PATH=/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/xgboost/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/vowpal_wabbit/utl/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/vowpal_wabbit/vowpalwabbit/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/R-3.1.2/install_dir/bin/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr//bin:/afs/.cern.ch/cms/slc6_amd64_gcc481/external/gcc/4.8.1/bin::/usr/sue/bin:/usr/lib64/qt-3.3/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin:/afs/cern.ch/user/v/valya/bin:/afs/cern.ch/user/v/valya/workspace/gopath/bin:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/libfm-1.42.src/bin:/afs/cern.ch/user/v/valya/workspace/hadoop/mongodb/bin:/afs/cern.ch/user/v/valya/workspace/hadoop/pydoop/install/bin
DEBUG:PydoopSubmitter:Setting env variable LD_LIBRARY_PATH=:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr/lib64/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr/jvm/java-1.6.0-openjdk-1.6.0.35.x86_64/jre/lib/amd64/server/:/afs/.cern.ch/cms/slc6_amd64_gcc481/external/gcc/4.8.1/lib:/afs/.cern.ch/cms/slc6_amd64_gcc481/external/gcc/4.8.1/lib64
DEBUG:PydoopSubmitter:Setting env variable PYTHONPATH=${PWD}:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr//lib64/python2.7/site-packages/:/afs/cern.ch/user/v/valya/workspace/DataAnalysis/SL6/usr//lib/python2.7/site-packages/::/afs/cern.ch/user/v/valya/workspace/hadoop/usr/lib/python2.7/site-packages
DEBUG:PydoopSubmitter:dumped pipes_code to: hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/4235df98-d85a-4f67-81aa-ffd41e8e3304
DEBUG:PydoopSubmitter:uploading: file:///afs/cern.ch/work/v/valya/hadoop/mr.py to hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/mr.py
DEBUG:PydoopSubmitter:uploading: file:///afs/cern.ch/work/v/valya/hadoop/pydoop.tgz to hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/pydoop.tgz
DEBUG:PydoopSubmitter:uploading: file:///afs/cern.ch/work/v/valya/hadoop/avro.tgz to hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/avro.tgz
DEBUG:PydoopSubmitter:Created remote paths:
DEBUG:PydoopSubmitter:HADOOP_CLASSPATH: ':/afs/cern.ch/user/v/valya/workspace/hadoop/usr/lib/python2.7/site-packages/pydoop-1.1.0-py2.7.egg/pydoop/pydoop.jar'
DEBUG:PydoopSubmitter:final args: ['/usr/bin/hadoop', 'it.crs4.pydoop.mapreduce.pipes.Submitter', '-D', 'mapreduce.job.name=pydoop', '-D', 'mapreduce.job.reduces=1', '-D', 'mapreduce.map.output.compress=true', '-D', 'mapreduce.job.cache.archives=hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/pydoop.tgz#pydoop,hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/avro.tgz#avro', '-D', 'mapreduce.pipes.isjavarecordreader=false', '-D', 'mapreduce.job.cache.files=hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/mr.py#mr.py', '-D', 'mapred.create.symlink=yes', '-D', 'bl.libhdfs.opts=-Xmx48m', '-D', 'mapreduce.pipes.isjavarecordwriter=true', '-libjars', '/afs/cern.ch/user/v/valya/workspace/hadoop/usr/lib/python2.7/site-packages/pydoop-1.1.0-py2.7.egg/pydoop/pydoop.jar', '-input', 'hdfs://p01001532965510.cern.ch:9000/user/valya/test/data/055530c1309dbcd5de95127ebff0c637.avro.gz', '-output', 'hdfs://p01001532965510.cern.ch:9000/user/valya/test/mrout', '-program', 'hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea/4235df98-d85a-4f67-81aa-ffd41e8e3304']
15/12/26 16:53:05 INFO client.RMProxy: Connecting to ResourceManager at p01001532965510.cern.ch/128.142.36.226:8032
15/12/26 16:53:05 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 1110598 for valya on 128.142.36.226:9000
15/12/26 16:53:05 INFO security.TokenCache: Got dt for hdfs://p01001532965510.cern.ch:9000; Kind: HDFS_DELEGATION_TOKEN, Service: 128.142.36.226:9000, Ident: (HDFS_DELEGATION_TOKEN token 1110598 for valya)
15/12/26 16:53:06 WARN mapreduce.JobSubmitter: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
15/12/26 16:53:06 INFO input.FileInputFormat: Total input paths to process : 1
15/12/26 16:53:06 INFO mapreduce.JobSubmitter: number of splits:1
15/12/26 16:53:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1446446075159_42435
15/12/26 16:53:08 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: 128.142.36.226:9000, Ident: (HDFS_DELEGATION_TOKEN token 1110598 for valya)
15/12/26 16:53:08 INFO mapred.YARNRunner: Job jar is not present. Not adding any jar to the list of resources.
15/12/26 16:53:08 INFO impl.YarnClientImpl: Submitted application application_1446446075159_42435
15/12/26 16:53:08 INFO mapreduce.Job: The url to track the job: http://p01001532965510.cern.ch:8088/proxy/application_1446446075159_42435/
15/12/26 16:53:08 INFO mapreduce.Job: Running job: job_1446446075159_42435
15/12/26 16:53:20 INFO mapreduce.Job: Job job_1446446075159_42435 running in uber mode : false
15/12/26 16:53:20 INFO mapreduce.Job:  map 0% reduce 0%
15/12/26 16:53:44 INFO mapreduce.Job:  map 100% reduce 0%
15/12/26 16:54:04 INFO mapreduce.Job:  map 100% reduce 100%
15/12/26 16:54:05 INFO mapreduce.Job: Job job_1446446075159_42435 completed successfully
15/12/26 16:54:05 INFO mapreduce.Job: Counters: 51
	File System Counters
		FILE: Number of bytes read=115
		FILE: Number of bytes written=259768
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=162
		HDFS: Number of bytes written=78
		HDFS: Number of read operations=5
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters
		Launched map tasks=1
		Launched reduce tasks=1
		Other local map tasks=1
		Total time spent by all maps in occupied slots (ms)=87316
		Total time spent by all reduces in occupied slots (ms)=87485
		Total time spent by all map tasks (ms)=21829
		Total time spent by all reduce tasks (ms)=17497
		Total vcore-seconds taken by all map tasks=21829
		Total vcore-seconds taken by all reduce tasks=17497
		Total megabyte-seconds taken by all map tasks=44705792
		Total megabyte-seconds taken by all reduce tasks=44792320
	Map-Reduce Framework
		Map input records=0
		Map output records=1
		Map output bytes=96
		Map output materialized bytes=107
		Input split bytes=162
		Combine input records=0
		Combine output records=0
		Reduce input groups=1
		Reduce shuffle bytes=107
		Reduce input records=1
		Reduce output records=1
		Spilled Records=2
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=86
		CPU time spent (ms)=3700
		Physical memory (bytes) snapshot=582696960
		Virtual memory (bytes) snapshot=5904072704
		Total committed heap usage (bytes)=1248329728
	Pydoop TaskContext
		TIME_MAP CALLS (ms)=0
		TIME_REDUCE CALLS (ms)=1
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters
		Bytes Read=0
	File Output Format Counters
		Bytes Written=78
15/12/26 16:54:05 INFO util.ExitUtil: Exiting with status 0
INFO:PydoopSubmitter:Done
DEBUG:PydoopSubmitter:Removing temporary working directory hdfs://p01001532965510.cern.ch:9000/user/valya/test/pydoop_submit_e08a2183b7f141819d26a92182f996ea

The actual output from map-reduce job was stored into HDFS into /user/valya/test/mrout and I was able to access it as following

bash$ hadoop fs -ls /user/valya/test/mrout
Found 2 items
-rw-r--r--   3 valya supergroup          0 2015-12-28 23:29 /user/valya/test/mrout/_SUCCESS
-rw-r--r--   3 valya supergroup         78 2015-12-28 23:29 /user/valya/test/mrout/part-r-00000

Once job was completed I copied successful output to local file system as

bash$ hadoop fs -get /user/valya/test/mrout/part-r-0000 .
bash$ cat part-r-00000
{'task': <generator object get_value_stream at 0x2899550>, 'jobid': 3268289}