Kettle系列:使用Kudu API插入数据到Kudu中
2021-04-13 20:25
标签:value ret a.out session 时间 bsp random png key 本文详细介绍了在Kettle中使用 Kudu API将数据写入Kudu中, 从本文可以学习到: 本Kettle示例非常简单, Data Grid 组件定义一些sample data(包含多种数据类型), Java class将这些sample data写入kudu. Kudu表schema: 重点看Java class 代码: Kettle系列:使用Kudu API插入数据到Kudu中 标签:value ret a.out session 时间 bsp random png key 原文地址:https://www.cnblogs.com/harrychinese/p/kettle_insert_data_into_kudu.html
1. 如何编写一个简单的 Kettle 的 Used defined Java class.
2. 如何读取Kettle 每个记录的字段. 需要注意的是 getInteger() 返回的是Long 对象; 而获取 Timestamp 字段的方法是getDate().
3. 如何调用Kudu API. CREATE TABLE kudu_testdb.perf_test_t1
(
id string ENCODING PLAIN_ENCODING COMPRESSION SNAPPY,
int_value int,
bigint_value bigint,
timestamp_value timestamp,
bool_value int,
PRIMARY KEY (histdate,id)
)
PARTITION BY HASH (histdate,id) PARTITIONS 2
STORED AS KUDU
TBLPROPERTIES (
‘kudu.table_name‘ = ‘testdb.perf_test_t1‘,
‘kudu.master_addresses‘ = ‘10.205.6.1:7051,10.205.6.2:7051,10.205.7.3:7051‘
);
import java.sql.Timestamp;
import java.util.UUID;
import static java.lang.Math.toIntExact;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration;
private final static String KUDU_TABLE="testdb.perf_test_t1";
private final static String KUDU_SERVERS="10.205.6.1:7051,10.205.6.2:7051,10.205.7.3:7051";
private final static int OPERATION_BATCH = 50;
KuduClient client=null;
KuduSession session=null;
KuduTable table=null;
Integer recordCount=null;
SessionConfiguration.FlushMode mode;
private Object[] previousRow;
private Object[] currentRow;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
}
currentRow = getRow();
if (currentRow == null) {
setOutputDone();
return false;
}
try {
session.setFlushMode(mode);
session.setMutationBufferSpace(OPERATION_BATCH);
int uncommit = 0;
while(currentRow != null) {
Insert insert = table.newInsert();
PartialRow kuduRow = insert.getRow();
int intTmp;
Long longTmp;
String stringTmp;
java.util.Date dateTmp;
Boolean booleanTmp;
// kettle string -> kudu string
//kuduRow.addString("id",UUID.randomUUID().toString());
stringTmp = get(Fields.In, "id").getString(currentRow);
if (stringTmp!=null)
{
kuduRow.addString("id",stringTmp);
}
// kettle int -> kudu int
//import static java.lang.Math.toIntExact;
longTmp=get(Fields.In, "int_value").getInteger(currentRow);
if (longTmp!=null)
{
intTmp =toIntExact(get(Fields.In, "int_value").getInteger(currentRow));
kuduRow.addInt("int_value", intTmp);
}
// kettle bigint -> kudu bigint
longTmp=get(Fields.In, "bigint_value").getInteger(currentRow);
if (longTmp!=null)
{
kuduRow.addLong("bigint_value", longTmp);
}
// kettle date/timestamp -> kudu timestamp
dateTmp= get(Fields.In, "timestamp_value").getDate(currentRow);
if (dateTmp!=null)
{
longTmp=dateTmp.getTime()+8*3600*1000; //转到东8区时间
kuduRow.addLong("timestamp_value", longTmp*1000);
}
// kettle boolean -> kudu int
booleanTmp= get(Fields.In, "boolean_value").getBoolean(currentRow);
if (booleanTmp!=null)
{
intTmp=0;
if (booleanTmp)
{intTmp=1;}
kuduRow.addInt("boolean_value", intTmp);
}
// 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
uncommit = uncommit + 1;
if (uncommit > OPERATION_BATCH / 2) {
session.flush();
uncommit = 0;
}
session.apply(insert);
previousRow=currentRow;
currentRow=getRow();
}
// 对于手工提交, 保证完成最后的提交
if (uncommit > 0) {
session.flush();
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}
// Send the row on to the next step.
//putRow(data.outputRowMeta, currentRow);
return false;
}
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) {
try {
client = new KuduClient.KuduClientBuilder(KUDU_SERVERS).build();
session = client.newSession();
table =client.openTable(KUDU_TABLE);
mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
} catch (Exception e) {
e.printStackTrace();
throw e;
}
return parent.initImpl(stepMetaInterface, stepDataInterface);
}
public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
try {
if (!session.isClosed()) {
session.close();
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}
parent.disposeImpl(smi, sdi);
}
文章标题:Kettle系列:使用Kudu API插入数据到Kudu中
文章链接:http://soscw.com/essay/75339.html