flink是一个非常好用的流任务计算框架, 这次我们来试用flink的restApi来提交任务. 主要阐述几个常用的restapi, 包括上传jar包, 查询jar包, 提交任务, 查询任务, 删除任务等,
其它的比如删除jar包, 查询jobmanager, 查询taskmanager等等, 类推就可以得出了, 不在这里进行重复介绍了
1, 上传jar包
public static boolean uploadJar( File jarFile) {
RequestBody requestBody = new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("file", jarFile.getName(),
RequestBody.create(MediaType.parse("multipart/form-data"), jarFile))
.build();
Request request = new Request.Builder()
.url("http://host:port/jars/upload")
.addHeader(userAgent, userAgentVal)
.post(requestBody)
.build();
Response resp = OkHttpUtils.execute(request);
if (OK == resp.code()) {
JSONObject body = JSON.parseObject(resp.body().string());
if ("success".equals(body.getString("status"))) {
return true;
}
}
return false;
}
2, 查询jar包
Request request = new Request.Builder()
.url("http://host:port/jars")
.addHeader(userAgent, userAgentVal)
.get()
.build();
Response response = OkHttpUtils.execute(request);
String body = response.body().string();
3,提交任务
(特别提示: 提交任务时, Main方法中,容易出现参数解析异常, 为了解决这一个问题, 强烈建议, 对参数进行编码转换, 对programArgs参数进行URLEncoder.encode(参数值, “utf-8”), 然后再在flink运行jar包, 进行解码.
String baseUrl = "http://host:port/jars/${jarId}/run";
Map<String, String> params = new HashMap<>();
params.put("programArgs", "xxxxxx");
params.put("entryClass", "com.xx.oo.JsonMain");
params.put("parallelism", "2");
params.put("savepointPath", null);
Request request = new Request.Builder()
.url(baseUrl)
.addHeader(userAgent, userAgentVal)
.post(RequestBody.create(JSON.toJSONString(params), MEDIA_TYPE_JSON))
.build();
Response resp = OkHttpUtils.execute(request);
String respBody = resp.body().string();
if (OK == resp.code()) {
JSONObject body = JSON.parseObject(respBody);
return body.getString("jobid");
}
4,查询任务
String url= "http://host:port/jobs";
Request request = new Request.Builder()
.url(url)
.addHeader(userAgent, userAgentVal)
.get()
.build();
Response resp = OkHttpUtils.execute(request);
if (OK == resp.code()) {
JSONObject body = JSON.parseObject(resp.body().string());
if (body.containsKey("jobs")) {
JSONArray jobs = body.getJSONArray("jobs");
for (int i = 0; i < jobs.size(); i++) {
JSONObject jsb = jobs.getJSONObject(i);
String id = jsb.getString("id");
String status = jsb.getString("status");
}
}
}else{
logger.error("queryJobByHttp "+resp.body().string());
}
4,删除任务
String url= "http://host:port/jobs/${jobId}";
Request request = new Request.Builder()
.url(baseUrl)
.addHeader(userAgent, userAgentVal)
.patch(RequestBody.create("{}", MEDIA_TYPE_JSON))
.build();
Response resp = OkHttpUtils.execute(request);
if (ACCEPTED == resp.code()) {
return jobId;
}
return null;