* 实体对象
* @author liu.zhiqiang
* @version $Id: AthenaxKafkaDTO.java, v 0.1 2018-09-12 18:14 liu.zhiqiang Exp $$
public class AthenaxKafkaDTO {
private String topic;
private String brokerAddress;
private String groupId;
private List<AthenaxJSONSchema> schemas;
// org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:97)
private TypeInformation<?> getTypeInfo(AthenaxJSONSchema schema) throws TableNotExistException {
String type = schema.getType();
if (type.toLowerCase().equals("string")) {
return BasicTypeInfo.STRING_TYPE_INFO;
} else if (type.toLowerCase().equals("boolean")) {
return BasicTypeInfo.BOOLEAN_TYPE_INFO;
} else if (type.toLowerCase().equals("byte")) {
return BasicTypeInfo.BYTE_TYPE_INFO;
} else if (type.toLowerCase().equals("short")) {
return BasicTypeInfo.SHORT_TYPE_INFO;
} else if (type.toLowerCase().equals("int")) {
return BasicTypeInfo.INT_TYPE_INFO;
} else if (type.toLowerCase().equals("long")) {
return BasicTypeInfo.LONG_TYPE_INFO;
} else if (type.toLowerCase().equals("float")) {
return BasicTypeInfo.FLOAT_TYPE_INFO;
} else if (type.toLowerCase().equals("double")) {
return BasicTypeInfo.DOUBLE_TYPE_INFO;
} else if (type.toLowerCase().equals("char")) {
return BasicTypeInfo.CHAR_TYPE_INFO;
} else if (type.toLowerCase().equals("void")) {
return BasicTypeInfo.VOID_TYPE_INFO;
} else if (type.toLowerCase().equals("biginteger")) {
return BasicTypeInfo.BIG_INT_TYPE_INFO;
} else if (type.toLowerCase().equals("bigdecimal")) {
return BasicTypeInfo.BIG_DEC_TYPE_INFO;
} else if (type.toLowerCase().equals("date")) {
return SqlTimeTypeInfo.DATE;
} else if (type.toLowerCase().equals("timestamp")) {
return SqlTimeTypeInfo.TIMESTAMP;
} else if (type.toLowerCase().equals("time")) {
return SqlTimeTypeInfo.TIME;
} else if (type.toLowerCase().equals("row")) {
List<AthenaxJSONSchema> schemaList = schema.getSchema();
List<String> nameList = new ArrayList<String>();
List<TypeInformation<?>> typeInformationList = new ArrayList<TypeInformation<?>>();
if (null != schemaList) {
for (AthenaxJSONSchema childSchema : schemaList) {
String[] nameArray = new String[nameList.size()];
TypeInformation<?>[] typeInformationArray = new TypeInformation<?>[typeInformationList
//return new RowTypeInfo();
return Types.ROW(nameArray, typeInformationArray);
} else {
String errorMsg = "fail to find TypeInformation for type[" + type + "]";
throw new TableNotExistException(type, type);
// TableSchema SCHEMA = new TableSchema(new String[] { "id", "proctime" },
// new TypeInformation<?>[] { BasicTypeInfo.INT_TYPE_INFO,
// SqlTimeTypeInfo.TIMESTAMP });
public TableSchema getTableSchema() throws TableNotExistException {
String[] keys = new String[schemas.size()];
TypeInformation<?>[] types = new TypeInformation<?>[schemas.size()];
int index = 0;
for (AthenaxJSONSchema schema : schemas) {
keys[index] = schema.getKey();
types[index++] = getTypeInfo(schema);
return new TableSchema(keys, types);
public String getTimeField() {
for (AthenaxJSONSchema schema : schemas) {
if (1 == schema.getTime()) {
return schema.getKey();
return null;
SELECT SUM(nested.number) as nestedNumber,hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`,COUNT(`value`) as `count`,AVG(CAST(`value` AS DOUBLE)) as `avg`,MAX(CAST(`value` AS DOUBLE)) as `max`,MIN(CAST(`value` AS DOUBLE)) as `min`,MAX(`time`) as `time` FROM input.tumble_topic_input_15 WHERE metric IS NOT NULL AND `value` IS NOT NULL and `time` IS NOT NULL GROUP BY metric,TUMBLE(`time`, INTERVAL '3' SECOND)
"schemas": [{"key": "metric","type": "string"},{"key": "time","type": "timestamp","time": 1},{"key":"value","type": "string"},{"key": "nested","type": "row","schema":[{"key": "clientId","type": "string"},{"key": "number","type": "int"}]}]