Skip to content

Latest commit

 

History

History
134 lines (105 loc) · 4.16 KB

sqoop.md

File metadata and controls

134 lines (105 loc) · 4.16 KB

sqoop源码解析

自定义hive配置

问题

sqoop操作hive时,会读取hive的配置文件,但是如果想要通过参数动态的覆盖默认的hive配置该如何?

探讨

首先看这个类org.apache.sqoop.hive.HiveConfig

//class: org.apache.sqoop.hive.HiveConfig
//line: 42
public static Configuration getHiveConf(Configuration conf) throws IOException {
    //...
      Class HiveConfClass = Class.forName(HIVE_CONF_CLASS);
      return ((Configuration)(HiveConfClass.getConstructor(Configuration.class, Class.class)
          .newInstance(conf, Configuration.class)));
    //...
  }

这段代码获取的是org.apache.hadoop.hive.conf.HiveConf类的实例。在HiveConf中有这段代码

// class: org.apache.hadoop.hive.conf.HiveConf
// line: 2935
public static Map<String, String> getConfSystemProperties() {
    Map<String, String> systemProperties = new HashMap<String, String>();
    for (ConfVars oneVar : ConfVars.values()) {
      if (System.getProperty(oneVar.varname) != null) {
        if (System.getProperty(oneVar.varname).length() > 0) {
          systemProperties.put(oneVar.varname, System.getProperty(oneVar.varname));
        }
      }
    }
    return systemProperties;
  }

在sqoopgetHiveConf方法中去实例化HiveConf类时,会读取hive的配置,其中会调用到上述的getConfSystemProperties方法,该方法会读取系统变量中的参数来覆盖默认的配置。这些可用的参数可以在ConfVars枚举类中看到。

解决

因此如果想要覆盖默认的hive配置只需要添加系统变量即可,例如

java \
-D'hive.metastore.uris=thrift://service:90831' \
-cp libs \
org.apache.sqoop.Sqoop args

也可以通过修改sqoop配置文件的方式

获取sqoop导入导出记录数量

问题

如何获取sqoop导入或者导出数据条数,以进行例如上报日志等操作?

探讨

mapreduce有一个计数器的东西,其源码已经实现了对map输入输出的计数。

//class: org.apache.sqoop.config.ConfigurationHelper
//line: 73
/**
* @return the number of mapper output records from a job using its counters.
*/
public static long getNumMapOutputRecords(Job job)
    throws IOException, InterruptedException {
return job.getCounters().findCounter(
    ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
    ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS).getValue();
}

/**
* @return the number of mapper input records from a job using its counters.
*/
public static long getNumMapInputRecords(Job job)
    throws IOException, InterruptedException {
return job.getCounters().findCounter(
        ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
        ConfigurationConstants.COUNTER_MAP_INPUT_RECORDS).getValue();
}

对于导入操作来说,数据传入map并从map按照一条条的数据写出到hdfs中,因此getNumMapOutputRecords便是数据导入的条数,
对于导出操作来说,传入map的数据是hdfs中的数据,一般情况下便是一条条的数据,因此getNumMapInputRecords便是数据导出的条数。

而对于一些特殊格式的数据,sqoop重写了计数器的计数逻辑。

//class: org.apache.sqoop.mapreduce.mainframe.MainframeDatasetImportMapper
private long numberOfRecords;

//line: 50
public void map(LongWritable key,  SqoopRecord val, Context context)
    throws IOException, InterruptedException {
String dataset = inputSplit.getCurrentDataset();
outkey.set(val.toString());
numberOfRecords++;
mos.write(outkey, NullWritable.get(), dataset);
}
//line: 68
@Override
protected void cleanup(Context context)
    throws IOException, InterruptedException {
super.cleanup(context);
mos.close();
context.getCounter(
    ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
    ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS)
    .increment(numberOfRecords);
}

在map的cleanup方法中进行了计数

解决

因此只需要从计数器获取数据即可

//导出(export)操作条数
long inputRecords = ConfigurationHelper.getNumMapInputRecords(job);

//导入(import)操作条数
long outputRecords = ConfigurationHelper.getNumMapOutputRecords(job);