使用Apache Kudu客户端,对KafkaConnect Sink 进行扩展。
使用的Apache Kudu 的Java 客户端。突然有天发现作业无法提交,一直报错。
后来才发现这是Kudu自身的一种校验机制。为了忽略这种校验机制,更符合我们的SQL习惯,我对代码做了改造。
而在Kudu的提交配置上,使用了手动提交的配置。而且我也建议使用手动提交的配置,这样效率更好,提交后对于异常数据的处理更加完整。
配置方式如下:
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
public void flush() throws KuduException {
final List<OperationResponse> responses = session.flush();
for (OperationResponse response : responses) {
if(!response.hasRowError()){ // 没错误 继续运行
continue;
}
String errorStr = response.getRowError().toString();
if(errorStr.contains("key not found")){
log.warn("encounter key not found error.More details =>"
+ " table: " + response.getRowError().getOperation().getTable().getName()
+ " row:" + response.getRowError().getOperation().getRow().stringifyRowKey());
continue;
}
throw new ConnectException("Failed to flush one or more changes. " +
"Transaction rolled back: "
+ response.getRowError().toString() + " oper info ->"
+ " table: " + response.getRowError().getOperation().getTable().getName()
+ " row:" + response.getRowError().getOperation().getRow().stringifyRowKey()
);
}
}