Apache Flink Kudu connector which provides sink recrods of Dataset and DataStream to kudu tables.
This is an straming example which sinks JSONObject records to kudu table. The example class is org.nn.flink.streaming.connectors.kudu.example.KuduSinkExample. Batch example can be found as org.nn.flink.streaming.connectors.kudu.example.KuduBatchOutputExample.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.socketTextStream("localhost", 44444);
Properties properties = new Properties();
properties.setProperty("timeoutMillis", "50000");
properties.setProperty("batchSize", "1000");
System.out.println(pt.get("masterAddress"));
KuduSink<JSONObject> kudu = new KuduSink<JSONObject>(
MASTER_ADDRESS,
TABLE_NAME,
KuduMapper.Mode.INSERT,
new JsonKuduTableRowConverter(),
properties
)
stream
.map(s -> (JSONObject)JSONObject.parse(s))
.addSink(kudu);
env.execute("kudu");
JsonKuduTableRowConverter
is a class implements KuduTableRowConverter
interface and provide the conversion between record type <IN>
and TableRow
Support three operation mode of Kudu - INSERT, UPDATE and UPSERT. User can set operation mode by passing argument to the constructor and the default mode is UPSERT,
This library supports sink to multiple tables based on custom stretegy. Here a example to achieve sinking JSONObject record to multiple tables based on the value of a certain key.
KuduSink<JSONObject> kudu = new KuduSink<JSONObject>(
MASTER_ADDRESS,
new JsonKeyTableSerializationSchema(),
KuduMapper.Mode.UPSERT,
new JsonKuduTableRowConverter(),
properties
)
JsonKeyTableSerializationSchema
implements TableSerializationSchema
interface. serializeTable()
method should include details that how tableName is determined.
Flink-kudu-connector提供DataStream和DataSet sink到Kudu table中的操作。
下面的例子将包含JSONObject记录的DataStream sink到Kudu table中去。Example代码在org.nn.flink.streaming.connectors.kudu.example.KuduSinkExample类中. Dataset的example 在org.nn.flink.streaming.connectors.kudu.example.KuduBatchOutputExample类中.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.socketTextStream("localhost", 44444);
Properties properties = new Properties();
properties.setProperty("timeoutMillis", "50000");
properties.setProperty("batchSize", "1000");
System.out.println(pt.get("masterAddress"));
KuduSink<JSONObject> kudu = new KuduSink<JSONObject>(
MASTER_ADDRESS,
TABLE_NAME,
KuduMapper.Mode.INSERT;
new JsonKuduTableRowConverter(),
properties
)
stream
.map(s -> (JSONObject)JSONObject.parse(s))
.addSink(kudu);
env.execute("kudu");
JsonKuduTableRowConverter
实现了KuduTableRowConverter
接口,用于将某种类型的记录(这里是JSONObject)转化为TableRow来让代码处理。
支持Kudu的三种操作模式 - INSERT, UPDATE, UPSERT, 可以通过构造函数传参设置操作模式,默认为UPSERT。
该库支持了自定义分配策略来将数据插入到不同的Kudu表中。下面是一个example实现了将JSON记录按某个key的值来插入到不同的表中去。
KuduSink<JSONObject> kudu = new KuduSink<JSONObject>(
MASTER_ADDRESS,
new JsonKeyTableSerializationSchema("table_name", TABLE_PREFIX, TABLE_SUFFIX),
KuduMapper.Mode.UPSERT,
new JsonKuduTableRowConverter(),
properties
)
JsonKeyTableSerializationSchema
实现了TableSerializationSchema
接口. 需要重写的serializeTable()
用于实现插入表明的判断逻辑。