Flink使用RestApi

Stella981
• 阅读 713

flink是一个非常好用的流任务计算框架, 这次我们来试用flink的restApi来提交任务. 主要阐述几个常用的restapi, 包括上传jar包, 查询jar包, 提交任务, 查询任务, 删除任务等,
其它的比如删除jar包, 查询jobmanager, 查询taskmanager等等, 类推就可以得出了, 不在这里进行重复介绍了

1, 上传jar包

   public static boolean uploadJar( File jarFile) {
        RequestBody requestBody = new MultipartBody.Builder()
                .setType(MultipartBody.FORM)
                .addFormDataPart("file", jarFile.getName(),
                    RequestBody.create(MediaType.parse("multipart/form-data"), jarFile))
                .build();

        Request request = new Request.Builder()
                .url("http://host:port/jars/upload")
                .addHeader(userAgent, userAgentVal)
                .post(requestBody)
                .build();
       Response resp = OkHttpUtils.execute(request);
            if (OK == resp.code()) {
                    JSONObject body = JSON.parseObject(resp.body().string());
                    if ("success".equals(body.getString("status"))) {
                        return true;
                }
            }
        return false;
    }

2, 查询jar包

  Request request = new Request.Builder()
                .url("http://host:port/jars")
                .addHeader(userAgent, userAgentVal)
                .get()
                .build();
  Response response = OkHttpUtils.execute(request);
  String body = response.body().string();    

3,提交任务
(特别提示: 提交任务时, Main方法中,容易出现参数解析异常, 为了解决这一个问题, 强烈建议, 对参数进行编码转换, 对programArgs参数进行URLEncoder.encode(参数值, “utf-8”), 然后再在flink运行jar包, 进行解码.

       String baseUrl = "http://host:port/jars/${jarId}/run";
        Map<String, String> params = new HashMap<>();
        params.put("programArgs", "xxxxxx");
        params.put("entryClass", "com.xx.oo.JsonMain");
        params.put("parallelism", "2");
        params.put("savepointPath", null);
        Request request = new Request.Builder()
                .url(baseUrl)
                .addHeader(userAgent, userAgentVal)
                .post(RequestBody.create(JSON.toJSONString(params), MEDIA_TYPE_JSON))
                .build();
        Response resp = OkHttpUtils.execute(request);
        String respBody = resp.body().string();
        if (OK == resp.code()) {
                JSONObject body = JSON.parseObject(respBody);
                return body.getString("jobid");
        }

4,查询任务

       String url= "http://host:port/jobs";
       Request request = new Request.Builder()
                .url(url)
                .addHeader(userAgent, userAgentVal)
                .get()
                .build();
        Response resp = OkHttpUtils.execute(request);
        if (OK == resp.code()) {
            JSONObject body = JSON.parseObject(resp.body().string());
            if (body.containsKey("jobs")) {
                JSONArray jobs = body.getJSONArray("jobs");
                for (int i = 0; i < jobs.size(); i++) {
                    JSONObject jsb = jobs.getJSONObject(i);
                    String id = jsb.getString("id");
                    String status = jsb.getString("status");
                }
            }
        }else{
            logger.error("queryJobByHttp "+resp.body().string());
        }

4,删除任务

       String url= "http://host:port/jobs/${jobId}";
        Request request = new Request.Builder()
                .url(baseUrl)
                .addHeader(userAgent, userAgentVal)
                .patch(RequestBody.create("{}", MEDIA_TYPE_JSON))
                .build();
            Response resp = OkHttpUtils.execute(request);
            if (ACCEPTED == resp.code()) {
                return jobId;
            }
        return null;
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Wesley13 Wesley13
3年前
JavaWeb 调用接口
JavaWeb 如何调用接口CreateTime2018年4月2日19:04:29Author:Marydon1.所需jar包!(https://oscimg.oschina.net/oscnet/0f139
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Java服务总在半夜挂,背后的真相竟然是... | 京东云技术团队
最近有用户反馈测试环境Java服务总在凌晨00:00左右挂掉,用户反馈Java服务没有定时任务,也没有流量突增的情况,Jvm配置也合理,莫名其妙就挂了
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这