Flink on YARN部署快速入门指南

Stella981
• 阅读 1497

Apache Flink是一个高效、分布式、基于Java和Scala(主要是由Java实现)实现的通用大数据分析引擎,它具有分布式 MapReduce一类平台的高效性、灵活性和扩展性以及并行数据库查询优化方案,它支持批量和基于流的数据分析,且提供了基于Java和Scala的API。

  从Flink官方文档可以知道,目前Flink支持三大部署模式:Local、Cluster以及Cloud,如下图所示:

Flink on YARN部署快速入门指南

  本文将简单地介绍如何部署Apache Flink On YARN(也就是如何在YARN上运行Flink作业),本文是基于Apache Flink 1.0.0以及Hadoop 2.2.0。

  在YARN上启动一个Flink主要有两种方式:(1)、启动一个YARN session(Start a long-running Flink cluster on YARN);(2)、直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。下面将分别进行介绍。

  这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和TaskManagers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)。我们可以通过./bin/yarn-session.sh脚本启动YARN Session,由于我们第一次使用这个脚本,我们先看看这个脚本支持哪些参数:

[flink]$ . /bin/yarn-session .sh

Usage:

Required

-n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)

Optional

-D <arg>                        Dynamic properties

-d,--detached                   Start detached

-jm,--jobManagerMemory <arg>    Memory for JobManager Container [ in MB]

-nm,--name <arg>                Set a custom name for the application on YARN

-q,--query                      Display available YARN resources (memory, cores)

-qu,--queue <arg>               Specify YARN queue.

-s,--slots <arg>                Number of slots per TaskManager

-st,--streaming                 Start Flink in streaming mode

-tm,--taskManagerMemory <arg>   Memory per TaskManager Container [ in MB]

各个参数的含义里面已经介绍的很详细了。在启动的是可以指定TaskManager的个数以及内存(默认是1G),也可以指定JobManager的内存,但是JobManager的个数只能是一个。好了,我们开启动一个YARN session吧:

. /bin/yarn-session .sh -n 4 -tm 8192 -s 8

上面命令启动了4个TaskManager,每个TaskManager内存为8G且占用了8个核(是每个TaskManager,默认是1个核)。在启动YARN session的时候会加载conf/flink-config.yaml配置文件,我们可以根据自己的需求去修改里面的相关参数(关于里面的参数含义请参见Flink官方文档介绍吧)。一切顺利的话,我们可以在https://www.iteblog.com:9981/proxy/application_1453101066555_2766724/#/overview上看到类似于下面的页面:

Flink on YARN部署快速入门指南

启动了YARN session之后我们如何运行作业呢?很简单,我们可以使用./bin/flink脚本提交作业,同样我们来看看这个脚本支持哪些参数:

[iteblog @ www.iteblog.com flink- 1.0 . 0 ]$ bin/flink

./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available :

Action "run" compiles and runs a program.

Syntax : run [OPTIONS] <jar-file> <arguments>

"run" action options :

-c,-- class <classname>               Class with the program entry point

( "main" method or "getPlan()" method.

Only needed if the JAR file does not

specify the class in its manifest.

-C,--classpath <url>                 Adds a URL to each user code

classloader  on all nodes in the

cluster. The paths must specify a

protocol (e.g. file : //) and be

accessible on all nodes (e.g. by means

of a NFS share). You can use this

option multiple times for specifying

more than one URL. The protocol must

be supported by the { @ link

java.net.URLClassLoader}.

-d,--detached                        If present, runs the job in detached

mode

-m,--jobmanager <host : port>          Address of the JobManager (master) to

which to connect. Specify

'yarn-cluster' as the JobManager to

deploy a YARN cluster for the job. Use

this flag to connect to a different

JobManager than the one specified in

the configuration.

-p,--parallelism <parallelism>       The parallelism with which to run the

program. Optional flag to override the

default value specified in the

configuration.

-q,--sysoutLogging                   If present, supress logging output to

standard out.

-s,--fromSavepoint <savepointPath>   Path to a savepoint to reset the job

back to ( for example

file : ///flink/savepoint-1537).

我们可以使用run选项运行Flink作业。这个脚本可以自动获取到YARN session的地址,所以我们可以不指定--jobmanager参数。我们以Flink自带的WordCount程序为例进行介绍,先将测试文件上传到HDFS上:

hadoop fs -copyFromLocal LICENSE hdfs : ///user/iteblog/

然后将这个文件作为输入并运行WordCount程序:

./bin/flink run ./examples/batch/WordCount.jar --input hdfs : ///user/iteblog/LICENSE

一切顺利的话,可以看到在终端会显示出计算的结果:

( 0 , 9 )

( 1 , 6 )

( 10 , 3 )

( 12 , 1 )

( 15 , 1 )

( 17 , 1 )

( 2 , 9 )

( 2004 , 1 )

( 2010 , 2 )

( 2011 , 2 )

( 2012 , 5 )

( 2013 , 4 )

( 2014 , 6 )

( 2015 , 7 )

( 2016 , 2 )

( 3 , 6 )

( 4 , 4 )

( 5 , 3 )

( 50 , 1 )

( 6 , 3 )

( 7 , 3 )

( 8 , 2 )

( 9 , 2 )

(a, 25 )

(above, 4 )

(acceptance, 1 )

(accepting, 3 )

(act, 1 )

如果我们不想将结果输出在终端,而是保存在文件中,可以使用--output参数指定保存结果的地方:

./bin/flink run ./examples/batch/WordCount.jar     \

--input hdfs : ///user/iteblog/LICENSE     \

--output hdfs : ///user/iteblog/result.txt

然后我们可以到hdfs:///user/iteblog/result.txt文件里面查看刚刚运行的结果。

  需要注意的是:1、上面的--input--output参数并不是Flink内部的参数,而是WordCount程序中定义的;
  2、指定路径的时候一定记得需要加上模式,比如上面的hdfs://,否者程序会在本地寻找文件。

  上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业。这里我们还是使用./bin/flink,但是不需要事先启动YARN session:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar      \

--input hdfs : ///user/iteblog/LICENSE                            \

--output hdfs : ///user/iteblog/result.txt

上面的命令同样会启动一个类似于YARN session启动的页面。其中的-yn是指TaskManager的个数,必须指定。

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这