给一个2层嵌套的数据
{"metric":"smsSendSucessCout1","nested":{"clientId":"client-id","number":20},"time":1537075089042,"value":"1.0"}
这个其实只要定义好schema就可以了,直接上源码
/**
* 实体对象
* @author liu.zhiqiang
* @version $Id: AthenaxKafkaDTO.java, v 0.1 2018-09-12 18:14 liu.zhiqiang Exp $$
*/
@Data
public class AthenaxKafkaDTO {
private String topic;
private String brokerAddress;
private String groupId;
private List<AthenaxJSONSchema> schemas;
// org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:97)
private TypeInformation<?> getTypeInfo(AthenaxJSONSchema schema) throws TableNotExistException {
String type = schema.getType();
if (type.toLowerCase().equals("string")) {
return BasicTypeInfo.STRING_TYPE_INFO;
} else if (type.toLowerCase().equals("boolean")) {
return BasicTypeInfo.BOOLEAN_TYPE_INFO;
} else if (type.toLowerCase().equals("byte")) {
return BasicTypeInfo.BYTE_TYPE_INFO;
} else if (type.toLowerCase().equals("short")) {
return BasicTypeInfo.SHORT_TYPE_INFO;
} else if (type.toLowerCase().equals("int")) {
return BasicTypeInfo.INT_TYPE_INFO;
} else if (type.toLowerCase().equals("long")) {
return BasicTypeInfo.LONG_TYPE_INFO;
} else if (type.toLowerCase().equals("float")) {
return BasicTypeInfo.FLOAT_TYPE_INFO;
} else if (type.toLowerCase().equals("double")) {
return BasicTypeInfo.DOUBLE_TYPE_INFO;
} else if (type.toLowerCase().equals("char")) {
return BasicTypeInfo.CHAR_TYPE_INFO;
} else if (type.toLowerCase().equals("void")) {
return BasicTypeInfo.VOID_TYPE_INFO;
} else if (type.toLowerCase().equals("biginteger")) {
return BasicTypeInfo.BIG_INT_TYPE_INFO;
} else if (type.toLowerCase().equals("bigdecimal")) {
return BasicTypeInfo.BIG_DEC_TYPE_INFO;
} else if (type.toLowerCase().equals("date")) {
return SqlTimeTypeInfo.DATE;
} else if (type.toLowerCase().equals("timestamp")) {
return SqlTimeTypeInfo.TIMESTAMP;
} else if (type.toLowerCase().equals("time")) {
return SqlTimeTypeInfo.TIME;
} else if (type.toLowerCase().equals("row")) {
//目前复杂的支持row,其它的待扩展
List<AthenaxJSONSchema> schemaList = schema.getSchema();
List<String> nameList = new ArrayList<String>();
List<TypeInformation<?>> typeInformationList = new ArrayList<TypeInformation<?>>();
//有数据就继续处理
if (null != schemaList) {
for (AthenaxJSONSchema childSchema : schemaList) {
nameList.add(childSchema.getKey());
typeInformationList.add(getTypeInfo(childSchema));
}
}
//构造返回结果
String[] nameArray = new String[nameList.size()];
nameList.toArray(nameArray);
TypeInformation<?>[] typeInformationArray = new TypeInformation<?>[typeInformationList
.size()];
typeInformationList.toArray(typeInformationArray);
//return new RowTypeInfo();
return Types.ROW(nameArray, typeInformationArray);
} else {
String errorMsg = "fail to find TypeInformation for type[" + type + "]";
LoggerUtil.error(errorMsg);
throw new TableNotExistException(type, type);
}
}
// TableSchema SCHEMA = new TableSchema(new String[] { "id", "proctime" },
// new TypeInformation<?>[] { BasicTypeInfo.INT_TYPE_INFO,
// SqlTimeTypeInfo.TIMESTAMP });
public TableSchema getTableSchema() throws TableNotExistException {
String[] keys = new String[schemas.size()];
TypeInformation<?>[] types = new TypeInformation<?>[schemas.size()];
int index = 0;
for (AthenaxJSONSchema schema : schemas) {
keys[index] = schema.getKey();
types[index++] = getTypeInfo(schema);
}
return new TableSchema(keys, types);
}
public String getTimeField() {
//目前只支持第1层定义,以后再优化
for (AthenaxJSONSchema schema : schemas) {
if (1 == schema.getTime()) {
return schema.getKey();
}
}
return null;
}
}
然后sql定义这么写
SELECT SUM(nested.number) as nestedNumber,hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`,COUNT(`value`) as `count`,AVG(CAST(`value` AS DOUBLE)) as `avg`,MAX(CAST(`value` AS DOUBLE)) as `max`,MIN(CAST(`value` AS DOUBLE)) as `min`,MAX(`time`) as `time` FROM input.tumble_topic_input_15 WHERE metric IS NOT NULL AND `value` IS NOT NULL and `time` IS NOT NULL GROUP BY metric,TUMBLE(`time`, INTERVAL '3' SECOND)
schema定义如下:
"schemas": [{"key": "metric","type": "string"},{"key": "time","type": "timestamp","time": 1},{"key":"value","type": "string"},{"key": "nested","type": "row","schema":[{"key": "clientId","type": "string"},{"key": "number","type": "int"}]}]
测试通过!