Flink技术整理

Stella981
• 阅读 776

 首先先拉取Flink的样例代码

mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.7.2                         \
      -DarchetypeCatalog=local

实现从文件读取的批处理

建立一个hello.txt,文件内容如下

hello world welcome
hello welcome

统计词频

public class BatchJavaApp { public static void main(String[] args) throws Exception { String input = "/Users/admin/Downloads/flink/data/hello.txt"; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource text = env.readTextFile(input); text.print(); text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = value.toLowerCase().split(" "); for (String token : tokens) { collector.collect(new Tuple2<>(token,1)); } } }).groupBy(0).sum(1).print(); } }

运行结果(日志省略)

hello welcome
hello world welcome
(world,1)
(hello,2)
(welcome,2)

纯Java实现

文件读取类

public class FileOperation { _/** _ * 读取文件名称为filename中的内容,并将其中包含的所有词语放进words中 * @param _filename _ * @param _words _ * _@return _ _*/ _ public static boolean readFile(String filename, List words) { if (filename == null || words == null) { System.out.println("filename为空或words为空"); return false; } Scanner scanner; try { File file = new File(filename); if (file.exists()) { FileInputStream fis = new FileInputStream(file); scanner = new Scanner(new BufferedInputStream(fis),"UTF-8"); scanner.useLocale(Locale.ENGLISH); }else { return false; } } catch (FileNotFoundException e) { System.out.println("无法打开" + filename); return false; } //简单分词 if (scanner.hasNextLine()) { String contents = scanner.useDelimiter("\\A").next(); int start = firstCharacterIndex(contents,0); for (int i = start + 1;i <= contents.length();) { if (i == contents.length() || !Character.isLetter(contents.charAt(i))) { String word = contents.substring(start,i).toLowerCase(); words.add(word); start = firstCharacterIndex(contents,i); i = start + 1; }else { i++; } } } return true; }

private static int firstCharacterIndex(String s,int start) {
    for (int i = start;i < s.length();i++) {
        if (Character._isLetter_(s.charAt(i))) {
            return i;

} } return s.length(); } }

public class BatchJavaOnly { public static void main(String[] args) { String input = "/Users/admin/Downloads/flink/data/hello.txt"; List list = new ArrayList<>(); FileOperation.readFile(input,list); System.out.println(list); Map<String,Integer> map = new HashMap<>(); list.forEach(token -> { if (map.containsKey(token)) { map.put(token,map.get(token) + 1); }else { map.put(token,1); } }); map.entrySet().stream().map(entry -> new Tuple2<>(entry.getKey(),entry.getValue())) .forEach(System.out::println); } }

运行结果

[hello, world, welcome, hello, welcome]
(world,1)
(hello,2)
(welcome,2)

Scala代码

拉取Scala样例代码

mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-scala     \
      -DarchetypeVersion=1.7.2                         \
      -DarchetypeCatalog=local

安装好Scala插件和配置好Scala SDK后

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._

object BatchScalaApp { def main(args: Array[String]): Unit = { val input = "/Users/admin/Downloads/flink/data/hello.txt" val env = ExecutionEnvironment._getExecutionEnvironment _ val text = env.readTextFile(input) text.flatMap(_.toLowerCase.split(" ")) .filter(_.nonEmpty) .map((_,1)) .groupBy(0) .sum(1) .print() } }

运行结果(省略日志)

(world,1)
(hello,2)
(welcome,2)

Scala基础内容请参考Scala入门篇 Scala入门之面向对象

从网络传输的流式处理

public class StreamingJavaApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource text = env.socketTextStream("127.0.0.1",9999); text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = value.toLowerCase().split(" "); for (String token : tokens) { collector.collect(new Tuple2<>(token,1)); } } }).keyBy(0).timeWindow(Time.seconds(5)) .sum(1).print(); env.execute("StreamingJavaApp"); } }

运行前打开端口

nc -lk 9999

运行代码,在nc命令输入a a c d b c e e f a

运行结果(日志省略)

1> (e,2)
9> (d,1)
11> (a,3)
3> (b,1)
4> (f,1)
8> (c,2)

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._

object StreamScalaApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ val text = env.socketTextStream("127.0.0.1",9999) text.flatMap(_.split(" ")) .map((_,1)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print() .setParallelism(1) env.execute("StreamScalaApp") } }

运行结果(省略日志)

(c,2)
(b,1)
(d,1)
(f,1)
(e,2)
(a,3)

现在我们将元组改成实体类

public class StreamObjJavaApp { @AllArgsConstructor @Data @ToString @NoArgsConstructor public static class WordCount { private String word; private int count; }

public static void main(String\[\] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment_();

DataStreamSource text = env.socketTextStream("127.0.0.1",9999); text.flatMap(new FlatMapFunction<String, WordCount>() { @Override public void flatMap(String value, Collector collector) throws Exception { String[] tokens = value.toLowerCase().split(" "); for (String token : tokens) { collector.collect(new WordCount(token,1)); } } }).keyBy("word").timeWindow(Time.seconds(5)) .sum("count").print(); env.execute("StreamingJavaApp"); } }

运行结果

4> StreamObjJavaApp.WordCount(word=f, count=1)
11> StreamObjJavaApp.WordCount(word=a, count=3)
8> StreamObjJavaApp.WordCount(word=c, count=2)
1> StreamObjJavaApp.WordCount(word=e, count=2)
9> StreamObjJavaApp.WordCount(word=d, count=1)
3> StreamObjJavaApp.WordCount(word=b, count=1)

当然我们也可以这么写

public class StreamObjJavaApp { @AllArgsConstructor @Data @ToString @NoArgsConstructor public static class WordCount { private String word; private int count; }

public static void main(String\[\] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment_();

DataStreamSource text = env.socketTextStream("127.0.0.1",9999); text.flatMap(new FlatMapFunction<String, WordCount>() { @Override public void flatMap(String value, Collector collector) throws Exception { String[] tokens = value.toLowerCase().split(" "); for (String token : tokens) { collector.collect(new WordCount(token,1)); } } }).keyBy(WordCount::getWord).timeWindow(Time.seconds(5)) .sum("count").print().setParallelism(1); env.execute("StreamingJavaApp"); } }

keyBy里面是一个函数式接口KeySelector

@Public @FunctionalInterface public interface KeySelector<IN, KEY> extends Function, Serializable { KEY getKey(IN value) throws Exception; }

flatMap的lambda表达式写法,比较繁琐,不如匿名类的写法

public class StreamObjJavaApp { @AllArgsConstructor @Data @ToString @NoArgsConstructor public static class WordCount { private String word; private int count; }

public static void main(String\[\] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment_();

DataStreamSource text = env.socketTextStream("127.0.0.1",9999); text.flatMap((FlatMapFunction<String,WordCount>)(value,collector) -> { String[] tokens = value.toLowerCase().split(" "); for (String token : tokens) { collector.collect(new WordCount(token,1)); } }).returns(WordCount.class) .keyBy(WordCount::getWord).timeWindow(Time.seconds(5)) .sum("count").print().setParallelism(1); env.execute("StreamingJavaApp"); } }

flatMap还可以使用RichFlatMapFunction抽象类

public class StreamObjJavaApp { @AllArgsConstructor @Data @ToString @NoArgsConstructor public static class WordCount { private String word; private int count; }

public static void main(String\[\] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment_();

DataStreamSource text = env.socketTextStream("127.0.0.1",9999); text.flatMap(new RichFlatMapFunction<String, WordCount>() { @Override public void flatMap(String value, Collector collector) throws Exception { String[] tokens = value.toLowerCase().split(" "); for (String token : tokens) { collector.collect(new WordCount(token,1)); } } }).keyBy(WordCount::getWord).timeWindow(Time.seconds(5)) .sum("count").print().setParallelism(1); env.execute("StreamingJavaApp"); } }

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._

object StreamObjScalaApp { case class WordCount(word: String,count: Int)

def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ val text = env.socketTextStream("127.0.0.1",9999) text.flatMap(_.split(" ")) .map(WordCount(_,1)) .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") .print() .setParallelism(1) env.execute("StreamScalaApp") } }

运行结果(省略日志)

WordCount(b,1)
WordCount(d,1)
WordCount(e,2)
WordCount(f,1)
WordCount(a,3)
WordCount(c,2)

数据源

从集合获取数据

public class DataSetDataSourceApp {

public static void main(String\[\] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment._getExecutionEnvironment_();

fromCollection(env); }

public static void fromCollection(ExecutionEnvironment env) throws Exception {
    List<Integer> list = new ArrayList<>();

for (int i = 1; i <= 10; i++) { list.add(i); } env.fromCollection(list).print(); } }

运行结果(省略日志)

1
2
3
4
5
6
7
8
9
10

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._

object DataSetDataSourceApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment._getExecutionEnvironment _ fromCollection(env) }

def fromCollection(env: ExecutionEnvironment): Unit = { val data = 1 to 10 env.fromCollection(data).print() } }

运行结果(省略日志)

1
2
3
4
5
6
7
8
9
10

从文件获取数据

public class DataSetDataSourceApp {

public static void main(String\[\] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment._getExecutionEnvironment_(); //        fromCollection(env);

textFile(env); }

public static void textFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/hello.txt";

env.readTextFile(filePath).print(); }

public static void fromCollection(ExecutionEnvironment env) throws Exception {
    List<Integer> list = new ArrayList<>();

for (int i = 1; i <= 10; i++) { list.add(i); } env.fromCollection(list).print(); } }

运行结果(省略日志)

hello welcome
hello world welcome

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._

object DataSetDataSourceApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // fromCollection(env) textFile(env) }

def textFile(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/hello.txt" env.readTextFile(filePath).print() }

def fromCollection(env: ExecutionEnvironment): Unit = { val data = 1 to 10 env.fromCollection(data).print() } }

运行结果(省略日志)

hello welcome
hello world welcome

从csv文件获取数据

在data目录下新增一个people.csv,内容如下

name,age,job
Jorge,30,Developer
Bob,32,Developer

public class DataSetDataSourceApp {

public static void main(String\[\] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment._getExecutionEnvironment_(); //        fromCollection(env); //        textFile(env);

csvFile(env); }

public static void csvFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/people.csv";

env.readCsvFile(filePath).ignoreFirstLine() .types(String.class,Integer.class,String.class) .print(); }

public static void textFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/hello.txt";

env.readTextFile(filePath).print(); }

public static void fromCollection(ExecutionEnvironment env) throws Exception {
    List<Integer> list = new ArrayList<>();

for (int i = 1; i <= 10; i++) { list.add(i); } env.fromCollection(list).print(); } }

运行结果(省略日志)

(Bob,32,Developer)
(Jorge,30,Developer)

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._

object DataSetDataSourceApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // fromCollection(env) // textFile(env) csvFile(env) }

def csvFile(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/people.csv" env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine = **true**).print() }

def textFile(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/hello.txt" env.readTextFile(filePath).print() }

def fromCollection(env: ExecutionEnvironment): Unit = { val data = 1 to 10 env.fromCollection(data).print() } }

运行结果(省略日志)

(Jorge,30,Developer)
(Bob,32,Developer)

将结果放入实体类中

public class DataSetDataSourceApp { @AllArgsConstructor @Data @ToString @NoArgsConstructor public static class Case { private String name; private Integer age; private String job; }

public static void main(String\[\] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment._getExecutionEnvironment_(); //        fromCollection(env); //        textFile(env);

csvFile(env); }

public static void csvFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/people.csv"; //        env.readCsvFile(filePath).ignoreFirstLine() //                .types(String.class,Integer.class,String.class) //                .print();

env.readCsvFile(filePath).ignoreFirstLine() .pojoType(Case.class,"name","age","job") .print(); }

public static void textFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/hello.txt";

env.readTextFile(filePath).print(); }

public static void fromCollection(ExecutionEnvironment env) throws Exception {
    List<Integer> list = new ArrayList<>();

for (int i = 1; i <= 10; i++) { list.add(i); } env.fromCollection(list).print(); } }

运行结果(省略日志)

DataSetDataSourceApp.Case(name=Bob, age=32, job=Developer)
DataSetDataSourceApp.Case(name=Jorge, age=30, job=Developer)

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._

object DataSetDataSourceApp { case class Case(name: String,age: Int,job: String)

def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // fromCollection(env) // textFile(env) csvFile(env) }

def csvFile(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/people.csv" // env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine = true).print() env.readCsvFile[Case](filePath,ignoreFirstLine = **true**,includedFields = _Array_(0,1,2)) .print() }

def textFile(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/hello.txt" env.readTextFile(filePath).print() }

def fromCollection(env: ExecutionEnvironment): Unit = { val data = 1 to 10 env.fromCollection(data).print() } }

运行结果(省略日志)

Case(Bob,32,Developer)
Case(Jorge,30,Developer)

获取递归文件夹

我们在data目录下新增两个文件夹1、2,将hello.txt分别拷贝进这两个文件夹

public class DataSetDataSourceApp { @AllArgsConstructor @Data @ToString @NoArgsConstructor public static class Case { private String name; private Integer age; private String job; }

public static void main(String\[\] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment._getExecutionEnvironment_(); //        fromCollection(env); //        textFile(env); //        csvFile(env);

readRecursiveFile(env); }

public static void readRecursiveFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data";

Configuration parameters = new Configuration(); parameters.setBoolean("recursive.file.enumeration",true); env.readTextFile(filePath).withParameters(parameters) .print(); }

public static void csvFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/people.csv"; //        env.readCsvFile(filePath).ignoreFirstLine() //                .types(String.class,Integer.class,String.class) //                .print();

env.readCsvFile(filePath).ignoreFirstLine() .pojoType(Case.class,"name","age","job") .print(); }

public static void textFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/hello.txt";

env.readTextFile(filePath).print(); }

public static void fromCollection(ExecutionEnvironment env) throws Exception {
    List<Integer> list = new ArrayList<>();

for (int i = 1; i <= 10; i++) { list.add(i); } env.fromCollection(list).print(); } }

运行结果(省略日志)

hello world welcome
hello world welcome
hello welcome
Jorge,30,Developer
name,age,job
hello world welcome
hello welcome
hello welcome
Bob,32,Developer

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration

object DataSetDataSourceApp { case class Case(name: String,age: Int,job: String)

def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // fromCollection(env) // textFile(env) // csvFile(env) readRecursiveFiles(env) }

def readRecursiveFiles(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data" val parameters = new Configuration parameters.setBoolean("recursive.file.enumeration",true) env.readTextFile(filePath).withParameters(parameters).print() }

def csvFile(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/people.csv" // env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine = true).print() env.readCsvFile[Case](filePath,ignoreFirstLine = **true**,includedFields = _Array_(0,1,2)) .print() }

def textFile(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/hello.txt" env.readTextFile(filePath).print() }

def fromCollection(env: ExecutionEnvironment): Unit = { val data = 1 to 10 env.fromCollection(data).print() } }

运行结果(省略日志)

hello world welcome
hello world welcome
hello welcome
Jorge,30,Developer
name,age,job
hello world welcome
hello welcome
hello welcome
Bob,32,Developer

获取压缩文件

在data文件夹下新建一个文件夹3,并压缩hello.txt

gzip hello.txt

得到一个新的文件hello.txt.gz,将改文件放入3中

public class DataSetDataSourceApp { @AllArgsConstructor @Data @ToString @NoArgsConstructor public static class Case { private String name; private Integer age; private String job; }

public static void main(String\[\] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment._getExecutionEnvironment_(); //        fromCollection(env); //        textFile(env); //        csvFile(env); //        readRecursiveFile(env);

readCompresssionFile(env); }

public static void readCompresssionFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/3";

env.readTextFile(filePath).print(); }

public static void readRecursiveFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data";

Configuration parameters = new Configuration(); parameters.setBoolean("recursive.file.enumeration",true); env.readTextFile(filePath).withParameters(parameters) .print(); }

public static void csvFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/people.csv"; //        env.readCsvFile(filePath).ignoreFirstLine() //                .types(String.class,Integer.class,String.class) //                .print();

env.readCsvFile(filePath).ignoreFirstLine() .pojoType(Case.class,"name","age","job") .print(); }

public static void textFile(ExecutionEnvironment env) throws Exception {
    String filePath = "/Users/admin/Downloads/flink/data/hello.txt";

env.readTextFile(filePath).print(); }

public static void fromCollection(ExecutionEnvironment env) throws Exception {
    List<Integer> list = new ArrayList<>();

for (int i = 1; i <= 10; i++) { list.add(i); } env.fromCollection(list).print(); } }

运行结果

hello world welcome
hello welcome

flink支持的压缩格式有:.deflate,.gz,.gzip,.bz2,.xz

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration

object DataSetDataSourceApp { case class Case(name: String,age: Int,job: String)

def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // fromCollection(env) // textFile(env) // csvFile(env) // readRecursiveFiles(env) readCompressionFiles(env) }

def readCompressionFiles(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/3" env.readTextFile(filePath).print() }

def readRecursiveFiles(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data" val parameters = new Configuration parameters.setBoolean("recursive.file.enumeration",true) env.readTextFile(filePath).withParameters(parameters).print() }

def csvFile(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/people.csv" // env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine = true).print() env.readCsvFile[Case](filePath,ignoreFirstLine = **true**,includedFields = _Array_(0,1,2)) .print() }

def textFile(env: ExecutionEnvironment): Unit = { val filePath = "/Users/admin/Downloads/flink/data/hello.txt" env.readTextFile(filePath).print() }

def fromCollection(env: ExecutionEnvironment): Unit = { val data = 1 to 10 env.fromCollection(data).print() } }

运行结果

hello world welcome
hello welcome

算子

map算子

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); mapFunction(env); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

2
3
4
5
6
7
8
9
10
11

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment._getExecutionEnvironment _ mapFunction(env) }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

2
3
4
5
6
7
8
9
10
11

filter算子

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); filterFunction(env); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

6
7
8
9
10
11

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) filterFunction(env) }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

6
7
8
9
10
11

mapPartition算子

按照并行度来分区返回结果

模拟一个数据库连接的工具类

public class DBUntils { public static int getConnection() { return new Random().nextInt(10); }

public static void returnConnection(int connection) {

}

}

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); mapPartitionFunction(env); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志,点号代表上面还有很多数字,横线上方总共有100个)

.
.
.
.
5
4
0
3
-----------
5
5
0
3

Scala代码

import scala.util.Random

object DBUntils { def getConnection(): Int = { new Random().nextInt(10) }

def returnConnection(connection: Int): Unit = {

} }

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) mapPartitionFunction(env) }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果

.
.
.
.
5
4
0
3
-----------
5
5
0
3

first算子

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); firstFunction(env); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.first(3).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(1,Hadoop)
(1,Spark)
(1,Flink)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) firstFunction(env) }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.first(3).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(1,Hadoop)
(1,Spark)
(1,Flink)

分组取前两条

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); firstFunction(env); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(3,Linux)
(1,Hadoop)
(1,Spark)
(4,VUE)
(2,Java)
(2,Spring boot)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) firstFunction(env) }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(3,Linux)
(1,Hadoop)
(1,Spark)
(4,VUE)
(2,Java)
(2,Spring boot)

分组以后按升序取前两条

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); firstFunction(env); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(3,Linux)
(1,Flink)
(1,Hadoop)
(4,VUE)
(2,Java)
(2,Spring boot)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) firstFunction(env) }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.ASCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(3,Linux)
(1,Flink)
(1,Hadoop)
(4,VUE)
(2,Java)
(2,Spring boot)

分组以后按降序取前两条

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); firstFunction(env); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(3,Linux)
(1,Spark)
(1,Hadoop)
(4,VUE)
(2,Spring boot)
(2,Java)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) firstFunction(env) }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(3,Linux)
(1,Spark)
(1,Hadoop)
(4,VUE)
(2,Spring boot)
(2,Java)

flatMap算子

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); // firstFunction(env); flatMapFunction(env); }

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark","hadoop,flink","flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String,String>)(value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class).print(); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

hadoop
spark
hadoop
flink
flink
flink

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) // firstFunction(env) flatMapFunction(env) }

def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).print() }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

hadoop
spark
hadoop
flink
flink
flink

当然它也支持跟Java同样的写法

def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap((value,collector: Collector[String]) => { val tokens = value.split(",") tokens.foreach(collector.collect(_)) }).print() }

统计单词数量

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); // firstFunction(env); flatMapFunction(env); }

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value,1); } }) .groupBy(0).sum(1).print(); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(hadoop,2)
(flink,3)
(spark,1)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) // firstFunction(env) flatMapFunction(env) }

def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).map((_,1)).groupBy(0) .sum(1).print() }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(hadoop,2)
(flink,3)
(spark,1)

distinct算子

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); // firstFunction(env); // flatMapFunction(env); distinctFunction(env); }

public static void distinctFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .distinct().print(); }

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value,1); } }) .groupBy(0).sum(1).print(); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

hadoop
flink
spark

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) // firstFunction(env) // flatMapFunction(env) distinctFunction(env) }

def distinctFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).distinct().print() }

def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).map((_,1)).groupBy(0) .sum(1).print() }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

hadoop
flink
spark

join算子

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); // firstFunction(env); // flatMapFunction(env); // distinctFunction(env); joinFunction(env); }

public static void joinFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info1 = Arrays._asList_(new Tuple2<>(1,"PK哥"),

new Tuple2<>(2,"J哥"), new Tuple2<>(3,"小队长"), new Tuple2<>(4,"天空蓝")); List<Tuple2<Integer,String>> info2 = Arrays.asList(new Tuple2<>(1,"北京"), new Tuple2<>(2,"上海"), new Tuple2<>(3,"成都"), new Tuple2<>(4,"杭州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.join(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1)); } }).print(); }

public static void distinctFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .distinct().print(); }

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value,1); } }) .groupBy(0).sum(1).print(); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(3,小队长,成都)
(1,PK哥,北京)
(4,天空蓝,杭州)
(2,J哥,上海)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) // firstFunction(env) // flatMapFunction(env) // distinctFunction(env) joinFunction(env) }

def joinFunction(env: ExecutionEnvironment): Unit = { val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝")) val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(4,"杭州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.join(data2).where(0).equalTo(0).apply((first,second) => (first._1,first._2,second._2) ).print() }

def distinctFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).distinct().print() }

def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).map((_,1)).groupBy(0) .sum(1).print() }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(3,小队长,成都)
(1,PK哥,北京)
(4,天空蓝,杭州)
(2,J哥,上海)

outJoin算子

左连接

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); // firstFunction(env); // flatMapFunction(env); // distinctFunction(env); // joinFunction(env); outJoinFunction(env); }

public static void outJoinFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info1 = Arrays._asList_(new Tuple2<>(1,"PK哥"),

new Tuple2<>(2,"J哥"), new Tuple2<>(3,"小队长"), new Tuple2<>(4,"天空蓝")); List<Tuple2<Integer,String>> info2 = Arrays.asList(new Tuple2<>(1,"北京"), new Tuple2<>(2,"上海"), new Tuple2<>(3,"成都"), new Tuple2<>(5,"杭州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.leftOuterJoin(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (second == null) { return new Tuple3<>(first.getField(0),first.getField(1),"-"); } return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1)); } }).print(); }

public static void joinFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info1 = Arrays._asList_(new Tuple2<>(1,"PK哥"),

new Tuple2<>(2,"J哥"), new Tuple2<>(3,"小队长"), new Tuple2<>(4,"天空蓝")); List<Tuple2<Integer,String>> info2 = Arrays.asList(new Tuple2<>(1,"北京"), new Tuple2<>(2,"上海"), new Tuple2<>(3,"成都"), new Tuple2<>(4,"杭州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.join(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1)); } }).print(); }

public static void distinctFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .distinct().print(); }

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value,1); } }) .groupBy(0).sum(1).print(); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(3,小队长,成都)
(1,PK哥,北京)
(4,天空蓝,-)
(2,J哥,上海)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) // firstFunction(env) // flatMapFunction(env) // distinctFunction(env) joinFunction(env) }

def joinFunction(env: ExecutionEnvironment): Unit = { val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝")) val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(5,"杭州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second) => { if (second == null) { (first._1,first._2,"-") }else { (first._1,first._2,second._2) } }).print() }

def distinctFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).distinct().print() }

def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).map((_,1)).groupBy(0) .sum(1).print() }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(3,小队长,成都)
(1,PK哥,北京)
(4,天空蓝,-)
(2,J哥,上海)

右连接

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); // firstFunction(env); // flatMapFunction(env); // distinctFunction(env); // joinFunction(env); outJoinFunction(env); }

public static void outJoinFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info1 = Arrays._asList_(new Tuple2<>(1,"PK哥"),

new Tuple2<>(2,"J哥"), new Tuple2<>(3,"小队长"), new Tuple2<>(4,"天空蓝")); List<Tuple2<Integer,String>> info2 = Arrays.asList(new Tuple2<>(1,"北京"), new Tuple2<>(2,"上海"), new Tuple2<>(3,"成都"), new Tuple2<>(5,"杭州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.rightOuterJoin(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (first == null) { return new Tuple3<>(second.getField(0),"-",second.getField(1)); } return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1)); } }).print(); }

public static void joinFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info1 = Arrays._asList_(new Tuple2<>(1,"PK哥"),

new Tuple2<>(2,"J哥"), new Tuple2<>(3,"小队长"), new Tuple2<>(4,"天空蓝")); List<Tuple2<Integer,String>> info2 = Arrays.asList(new Tuple2<>(1,"北京"), new Tuple2<>(2,"上海"), new Tuple2<>(3,"成都"), new Tuple2<>(4,"杭州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.join(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1)); } }).print(); }

public static void distinctFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .distinct().print(); }

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value,1); } }) .groupBy(0).sum(1).print(); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(3,小队长,成都)
(1,PK哥,北京)
(5,-,杭州)
(2,J哥,上海)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) // firstFunction(env) // flatMapFunction(env) // distinctFunction(env) joinFunction(env) }

def joinFunction(env: ExecutionEnvironment): Unit = { val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝")) val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(5,"杭州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first,second) => { if (first == null) { (second._1,"-",second._2) }else { (first._1,first._2,second._2) } }).print() }

def distinctFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).distinct().print() }

def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).map((_,1)).groupBy(0) .sum(1).print() }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(3,小队长,成都)
(1,PK哥,北京)
(5,-,杭州)
(2,J哥,上海)

全外连接

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); // firstFunction(env); // flatMapFunction(env); // distinctFunction(env); // joinFunction(env); outJoinFunction(env); }

public static void outJoinFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info1 = Arrays._asList_(new Tuple2<>(1,"PK哥"),

new Tuple2<>(2,"J哥"), new Tuple2<>(3,"小队长"), new Tuple2<>(4,"天空蓝")); List<Tuple2<Integer,String>> info2 = Arrays.asList(new Tuple2<>(1,"北京"), new Tuple2<>(2,"上海"), new Tuple2<>(3,"成都"), new Tuple2<>(5,"杭州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.fullOuterJoin(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (first == null) { return new Tuple3<>(second.getField(0),"-",second.getField(1)); }else if (second == null) { return new Tuple3<>(first.getField(0),first.getField(1),"-"); } return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1)); } }).print(); }

public static void joinFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info1 = Arrays._asList_(new Tuple2<>(1,"PK哥"),

new Tuple2<>(2,"J哥"), new Tuple2<>(3,"小队长"), new Tuple2<>(4,"天空蓝")); List<Tuple2<Integer,String>> info2 = Arrays.asList(new Tuple2<>(1,"北京"), new Tuple2<>(2,"上海"), new Tuple2<>(3,"成都"), new Tuple2<>(4,"杭州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.join(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1)); } }).print(); }

public static void distinctFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .distinct().print(); }

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value,1); } }) .groupBy(0).sum(1).print(); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(3,小队长,成都)
(1,PK哥,北京)
(4,天空蓝,-)
(5,-,杭州)
(2,J哥,上海)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) // firstFunction(env) // flatMapFunction(env) // distinctFunction(env) joinFunction(env) }

def joinFunction(env: ExecutionEnvironment): Unit = { val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝")) val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(5,"杭州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second) => { if (first == null) { (second._1,"-",second._2) }else if (second == null) { (first._1,first._2,"-") }else { (first._1,first._2,second._2) } }).print() }

def distinctFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).distinct().print() }

def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).map((_,1)).groupBy(0) .sum(1).print() }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(3,小队长,成都)
(1,PK哥,北京)
(4,天空蓝,-)
(5,-,杭州)
(2,J哥,上海)

cross算子

笛卡尔积

public class DataSetTransformationApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); // firstFunction(env); // flatMapFunction(env); // distinctFunction(env); // joinFunction(env); // outJoinFunction(env); crossFunction(env); }

public static void crossFunction(ExecutionEnvironment env) throws Exception {
    List<String> info1 = Arrays._asList_("曼联","曼城");

List info2 = Arrays.asList(3,1,0); DataSource data1 = env.fromCollection(info1); DataSource data2 = env.fromCollection(info2); data1.cross(data2).print(); }

public static void outJoinFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info1 = Arrays._asList_(new Tuple2<>(1,"PK哥"),

new Tuple2<>(2,"J哥"), new Tuple2<>(3,"小队长"), new Tuple2<>(4,"天空蓝")); List<Tuple2<Integer,String>> info2 = Arrays.asList(new Tuple2<>(1,"北京"), new Tuple2<>(2,"上海"), new Tuple2<>(3,"成都"), new Tuple2<>(5,"杭州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.fullOuterJoin(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (first == null) { return new Tuple3<>(second.getField(0),"-",second.getField(1)); }else if (second == null) { return new Tuple3<>(first.getField(0),first.getField(1),"-"); } return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1)); } }).print(); }

public static void joinFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info1 = Arrays._asList_(new Tuple2<>(1,"PK哥"),

new Tuple2<>(2,"J哥"), new Tuple2<>(3,"小队长"), new Tuple2<>(4,"天空蓝")); List<Tuple2<Integer,String>> info2 = Arrays.asList(new Tuple2<>(1,"北京"), new Tuple2<>(2,"上海"), new Tuple2<>(3,"成都"), new Tuple2<>(4,"杭州")); DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2); data1.join(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1)); } }).print(); }

public static void distinctFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .distinct().print(); }

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
    List<String> info = Arrays._asList_("hadoop,spark", "hadoop,flink", "flink,flink");

DataSource data = env.fromCollection(info); data.flatMap((FlatMapFunction<String, String>) (value, collector) -> { String tokens[] = value.split(","); Stream.of(tokens).forEach(collector::collect); }).returns(String.class) .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value,1); } }) .groupBy(0).sum(1).print(); }

public static void firstFunction(ExecutionEnvironment env) throws Exception {
    List<Tuple2<Integer,String>> info = Arrays._asList_(new Tuple2<>(1,"Hadoop"),

new Tuple2<>(1,"Spark"), new Tuple2<>(1,"Flink"), new Tuple2<>(2,"Java"), new Tuple2<>(2,"Spring boot"), new Tuple2<>(3,"Linux"), new Tuple2<>(4,"VUE")); DataSource<Tuple2<Integer, String>> data = env.fromCollection(info); data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
    List<String> students = new ArrayList<>();

for (int i = 0; i < 100; i++) { students.add("student: " + i); } DataSource data = env.fromCollection(students).setParallelism(4); //此处会按照students的数量进行转换 data.map(student -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); return connection; }).print(); System.out.println("-----------"); //此处会按照并行度的数量进行转换 data.mapPartition((MapPartitionFunction<String,Integer>)(student, collector) -> { int connection = DBUntils.getConnection(); //_TODO 数据库操作 _ DBUntils.returnConnection(connection); collector.collect(connection); }).returns(Integer.class).print(); }

public static void filterFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).filter(x -> x > 5).print(); }

public static void mapFunction(ExecutionEnvironment env) throws Exception {
    DataSource<Integer> data = env.fromCollection(Arrays._asList_(1,2,3,4,5,6,7,8,9,10));

data.map(x -> x + 1).print(); } }

运行结果(省略日志)

(曼联,3)
(曼联,1)
(曼联,0)
(曼城,3)
(曼城,1)
(曼城,0)

Scala代码

import com.guanjian.flink.scala.until.DBUntils import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object DataSetTransformationApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) // firstFunction(env) // flatMapFunction(env) // distinctFunction(env) // joinFunction(env) crossFunction(env) }

def crossFunction(env: ExecutionEnvironment): Unit = { val info1 = List("曼联","曼城") val info2 = List(3,1,0) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.cross(data2).print() }

def joinFunction(env: ExecutionEnvironment): Unit = { val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝")) val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(5,"杭州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second) => { if (first == null) { (second._1,"-",second._2) }else if (second == null) { (first._1,first._2,"-") }else { (first._1,first._2,second._2) } }).print() }

def distinctFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).distinct().print() }

def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = List("hadoop,spark","hadoop,flink","flink,flink") val data = env.fromCollection(info) data.flatMap(_.split(",")).map((_,1)).groupBy(0) .sum(1).print() }

def firstFunction(env: ExecutionEnvironment): Unit = { val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"), (2,"Spring boot"),(3,"Linux"),(4,"VUE")) val data = env.fromCollection(info) data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }

def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for(i <- 1 to 100) { students.append("student: " + i) } **val** data = env.fromCollection(students).setParallelism(4) data.map(student => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) connection }).print() println("-----------") data.mapPartition((student,collector: Collector[Int]) => { val connection = DBUntils.getConnection() //_TODO 数据库操作 _ DBUntils.returnConnection(connection) collector.collect(connection) }).print() }

def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).filter(_ > 5).print() }

def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) data.map(_ + 1).print() } }

运行结果(省略日志)

(曼联,3)
(曼联,1)
(曼联,0)
(曼城,3)
(曼城,1)
(曼城,0)

Sink(输出)

我们在flink文件夹下面新增一个sink-out的文件夹,此时文件夹为空

输出成文本文件

public class DataSetSinkApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); List data = new ArrayList<>(); for (int i = 1; i <= 10; i++) { data.add(i); } DataSource text = env.fromCollection(data); String filePath = "/Users/admin/Downloads/flink/sink-out/sinkjava/"; text.writeAsText(filePath); env.execute("DataSetSinkApp"); } }

运行结果

进入sink-out文件夹

Flink技术整理

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._

object DataSetSinkApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment._getExecutionEnvironment _ val data = 1 to 10 val text = env.fromCollection(data) val filePath = "/Users/admin/Downloads/flink/sink-out/sinkscala/" text.writeAsText(filePath) env.execute("DataSetSinkApp") } }

运行结果

Flink技术整理

如果此时我们再次运行代码就会报错,因为输出文件已经存在,如果要覆盖该文件,则需要调整代码

public class DataSetSinkApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); List data = new ArrayList<>(); for (int i = 1; i <= 10; i++) { data.add(i); } DataSource text = env.fromCollection(data); String filePath = "/Users/admin/Downloads/flink/sink-out/sinkjava/"; text.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE); env.execute("DataSetSinkApp"); } }

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode

object DataSetSinkApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment._getExecutionEnvironment _ val data = 1 to 10 val text = env.fromCollection(data) val filePath = "/Users/admin/Downloads/flink/sink-out/sinkscala/" text.writeAsText(filePath,WriteMode.OVERWRITE) env.execute("DataSetSinkApp") } }

增加并行度,输出多个文件

public class DataSetSinkApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); List data = new ArrayList<>(); for (int i = 1; i <= 10; i++) { data.add(i); } DataSource text = env.fromCollection(data); String filePath = "/Users/admin/Downloads/flink/sink-out/sinkjava/"; text.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE).setParallelism(4); env.execute("DataSetSinkApp"); } }

运行结果

此时我们可以看到sinkjava变成了一个文件夹,而该文件夹下面有4个文件

Flink技术整理

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode

object DataSetSinkApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment._getExecutionEnvironment _ val data = 1 to 10 val text = env.fromCollection(data) val filePath = "/Users/admin/Downloads/flink/sink-out/sinkscala/" text.writeAsText(filePath,WriteMode.OVERWRITE).setParallelism(4) env.execute("DataSetSinkApp") } }

运行结果

Flink技术整理

计数器

public class CounterApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm"); String filePath = "/Users/admin/Downloads/flink/sink-out/sink-java-counter/"; data.map(new RichMapFunction<String, String>() { LongCounter counter = new LongCounter(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); getRuntimeContext().addAccumulator("ele-counts-java", counter); }

        @Override

public String map(String value) throws Exception { counter.add(1); return value; } }).writeAsText(filePath, FileSystem.WriteMode.OVERWRITE).setParallelism(4); JobExecutionResult jobResult = env.execute("CounterApp"); Long num = jobResult.getAccumulatorResult("ele-counts-java"); System.out.println("num: " + num); } }

运行结果(省略日志)

num: 5

Scala代码

import org.apache.flink.api.common.accumulators.LongCounter import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.FileSystem.WriteMode

object CounterApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment._getExecutionEnvironment _ val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm") val filePath = "/Users/admin/Downloads/flink/sink-out/sink-scala-counter/" data.map(new RichMapFunction[String,String]() { val counter = new LongCounter

  **override def** open(parameters: Configuration): Unit \= {
    getRuntimeContext.addAccumulator("ele-counts-scala", _counter_)
  }

  **override def** map(value: String) = {
    _counter_.add(1)
    value
  }
}).writeAsText(filePath,WriteMode._OVERWRITE_).setParallelism(4)
**val** jobResult = env.execute("CounterApp")
**val** num = jobResult.getAccumulatorResult\[Long\]("ele-counts-scala")
_println_("num: " \+ num)

} }

运行结果(省略日志)

num: 5

分布式缓存

public class DistriutedCacheApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String filePath = "/Users/admin/Downloads/flink/data/hello.txt"; env.registerCachedFile(filePath,"pk-java-dc"); DataSource data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm"); data.map(new RichMapFunction<String, String>() { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); File dcFile = getRuntimeContext().getDistributedCache().getFile("pk-java-dc"); List lines = FileUtils.readLines(dcFile); lines.forEach(System.out::println); }

        @Override

public String map(String value) throws Exception { return value; } }).print(); } }

运行结果(省略日志)

hello world welcome
hello welcome
hadoop
spark
flink
pyspark
storm

Scala代码

import org.apache.commons.io.FileUtils import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import scala.collection.JavaConverters._

object DistriutedCacheApp { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment._getExecutionEnvironment _ val filePath = "/Users/admin/Downloads/flink/data/hello.txt" env.registerCachedFile(filePath,"pk-scala-dc") val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm") data.map(new RichMapFunction[String,String] { override def open(parameters: Configuration): Unit = { val dcFile = getRuntimeContext.getDistributedCache.getFile("pk-scala-dc") val lines = FileUtils.readLines(dcFile) lines.asScala.foreach(println(_)) }

  **override def** map(value: String) = {
    value
  }
})

}.print() }

运行结果(省略日志)

hello world welcome
hello welcome
hadoop
spark
flink
pyspark
storm

流处理

socket

public class DataStreamSourceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); socketFunction(env); env.execute("DataStreamSourceApp"); }

public static void socketFunction(StreamExecutionEnvironment env) {
    DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9999);

data.print().setParallelism(1); } }

运行前执行控制台

nc -lk 9999

执行后,在控制台输入

Flink技术整理

运行结果(省略日志)

hello world welcome
hello welcome
hadoop
spark
flink
pyspark
storm

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ socketFunction(env) env.execute("DataStreamSourceApp") }

def socketFunction(env: StreamExecutionEnvironment): Unit = { val data = env.socketTextStream("127.0.0.1",9999) data.print().setParallelism(1) } }

运行结果(省略日志)

hello world welcome
hello welcome
hadoop
spark
flink
pyspark
storm

自定义数据源

不可并行数据源

public class CustomNonParallelSourceFunction implements SourceFunction { private boolean isRunning = true; private long count = 1; @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { ctx.collect(count); count++; Thread.sleep(1000); } }

@Override

public void cancel() { isRunning = false; } }

public class DataStreamSourceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(env); nonParallelSourceFunction(env); env.execute("DataStreamSourceApp"); }

public static void nonParallelSourceFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction());

data.print().setParallelism(1); }

public static void socketFunction(StreamExecutionEnvironment env) {
    DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9999);

data.print().setParallelism(1); } }

运行结果(省略日志,每隔1秒打印一次)

1
2
3
4
5
6
.
.
.

因为是不可并行,如果我们调大并行度则会报错,如

public class DataStreamSourceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(env); nonParallelSourceFunction(env); env.execute("DataStreamSourceApp"); }

public static void nonParallelSourceFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction())
            .setParallelism(2);

data.print().setParallelism(1); }

public static void socketFunction(StreamExecutionEnvironment env) {
    DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9999);

data.print().setParallelism(1); } }

结果报错

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
    at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)
    at com.guanjian.flink.java.test.DataStreamSourceApp.nonParallelSourceFunction(DataStreamSourceApp.java:17)
    at com.guanjian.flink.java.test.DataStreamSourceApp.main(DataStreamSourceApp.java:11)

Scala代码

import org.apache.flink.streaming.api.functions.source.SourceFunction

class CustomNonParallelSourceFunction extends SourceFunction[Long] { private var isRunning = **true ** private var count = 1L override def cancel(): Unit = { isRunning = **false ** }

override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } }

import com.guanjian.flink.scala.until.CustomNonParallelSourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._

object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // socketFunction(env) nonParallelSourceFunction(env) env.execute("DataStreamSourceApp") }

def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.print().setParallelism(1) }

def socketFunction(env: StreamExecutionEnvironment): Unit = { val data = env.socketTextStream("127.0.0.1",9999) data.print().setParallelism(1) } }

运行结果(省略日志,每隔1秒打印一次)

1
2
3
4
5
6
.
.
.

可并行数据源

public class CustomParallelSourceFunction implements ParallelSourceFunction { private boolean isRunning = true; private long count = 1; @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { ctx.collect(count); count++; Thread.sleep(1000); } }

@Override

public void cancel() { isRunning = false; } }

public class DataStreamSourceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(env); // nonParallelSourceFunction(env); parallelSourceFunction(env); env.execute("DataStreamSourceApp"); }

public static void parallelSourceFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomParallelSourceFunction())
            .setParallelism(2);

data.print().setParallelism(1); }

public static void nonParallelSourceFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction());

data.print().setParallelism(1); }

public static void socketFunction(StreamExecutionEnvironment env) {
    DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9999);

data.print().setParallelism(1); } }

运行结果(省略日志,每隔1秒打印一次,每次打印两条)

1
1
2
2
3
3
4
4
5
5
.
.
.
.

Scala代码

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

class CustomParallelSourceFunction extends ParallelSourceFunction[Long] { private var isRunning = **true ** private var count = 1L override def cancel(): Unit = { isRunning = **false ** }

override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } }

import com.guanjian.flink.scala.until.{CustomNonParallelSourceFunction, CustomParallelSourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._

object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // socketFunction(env) // nonParallelSourceFunction(env) parallelSourceFunction(env) env.execute("DataStreamSourceApp") }

def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomParallelSourceFunction) .setParallelism(2) data.print().setParallelism(1) }

def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.print().setParallelism(1) }

def socketFunction(env: StreamExecutionEnvironment): Unit = { val data = env.socketTextStream("127.0.0.1",9999) data.print().setParallelism(1) } }

运行结果(省略日志,每隔1秒打印一次,每次打印两条)

1
1
2
2
3
3
4
4
5
5
.
.
.
.

增强数据源

public class CustomRichParallelSourceFunction extends RichParallelSourceFunction { private boolean isRunning = true; private long count = 1; _/** _ * 可以在这里面实现一些其他需求的代码 * @param _parameters _ * @throws _Exception _ */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); }

@Override

public void close() throws Exception { super.close(); }

@Override

public void run(SourceContext ctx) throws Exception { while (isRunning) { ctx.collect(count); count++; Thread.sleep(1000); } }

@Override

public void cancel() { isRunning = false; } }

public class DataStreamSourceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(env); // nonParallelSourceFunction(env); // parallelSourceFunction(env); richParallelSourceFunction(env); env.execute("DataStreamSourceApp"); }

public static void richParallelSourceFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomRichParallelSourceFunction())
            .setParallelism(2);

data.print().setParallelism(1); }

public static void parallelSourceFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomParallelSourceFunction())
            .setParallelism(2);

data.print().setParallelism(1); }

public static void nonParallelSourceFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction());

data.print().setParallelism(1); }

public static void socketFunction(StreamExecutionEnvironment env) {
    DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9999);

data.print().setParallelism(1); } }

运行结果与可并行数据源相同

Scala代码

import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] { private var isRunning = **true ** private var count = 1L override def open(parameters: Configuration): Unit = {

}

override def close(): Unit = {

}

override def cancel(): Unit = { isRunning = **false ** }

override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } }

import com.guanjian.flink.scala.until.{CustomNonParallelSourceFunction, CustomParallelSourceFunction, CustomRichParallelSourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._

object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // socketFunction(env) // nonParallelSourceFunction(env) // parallelSourceFunction(env) richParallelSourceFunction(env) env.execute("DataStreamSourceApp") }

def richParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomRichParallelSourceFunction) .setParallelism(2) data.print().setParallelism(1) }

def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomParallelSourceFunction) .setParallelism(2) data.print().setParallelism(1) }

def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.print().setParallelism(1) }

def socketFunction(env: StreamExecutionEnvironment): Unit = { val data = env.socketTextStream("127.0.0.1",9999) data.print().setParallelism(1) } }

运行结果与可并行数据源相同

流算子

map和filter

public class DataStreamTransformationApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); filterFunction(env); env.execute("DataStreamTransformationApp"); }

public static void filterFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction());

data.map(x -> { System.out.println("接收到: " + x); return x; }).filter(x -> x % 2 == 0).print().setParallelism(1); } }

 运行结果(省略日志)

接收到: 1
接收到: 2
2
接收到: 3
接收到: 4
4
接收到: 5
接收到: 6
6
.
.

Scala代码

import com.guanjian.flink.scala.until.CustomNonParallelSourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._

object DataStreamTransformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ filterFunction(env) env.execute("DataStreamTransformationApp") }

def filterFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.map(x => { println("接收到: " + x) x }).filter(_ % 2 == 0).print().setParallelism(1) } }

 运行结果(省略日志)

接收到: 1
接收到: 2
2
接收到: 3
接收到: 4
4
接收到: 5
接收到: 6
6
.
.

union

public class DataStreamTransformationApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // filterFunction(env); unionFunction(env); env.execute("DataStreamTransformationApp"); }

public static void unionFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data1 = env.addSource(new CustomNonParallelSourceFunction());

DataStreamSource data2 = env.addSource(new CustomNonParallelSourceFunction()); data1.union(data2).print().setParallelism(1); }

public static void filterFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction());

data.map(x -> { System.out.println("接收到: " + x); return x; }).filter(x -> x % 2 == 0).print().setParallelism(1); } }

运行结果(省略日志)

1
1
2
2
3
3
4
4
5
5
.
.

Scala代码

import com.guanjian.flink.scala.until.CustomNonParallelSourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._

object DataStreamTransformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // filterFunction(env) unionFunction(env) env.execute("DataStreamTransformationApp") }

def unionFunction(env: StreamExecutionEnvironment): Unit = { val data1 = env.addSource(new CustomNonParallelSourceFunction) val data2 = env.addSource(new CustomNonParallelSourceFunction) data1.union(data2).print().setParallelism(1) }

def filterFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.map(x => { println("接收到: " + x) x }).filter(_ % 2 == 0).print().setParallelism(1) } }

运行结果(省略日志)

1
1
2
2
3
3
4
4
5
5
.
.

split和select

将一个流拆成多个流以及挑选其中一个流

public class DataStreamTransformationApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // filterFunction(env); // unionFunction(env); splitSelectFunction(env); env.execute("DataStreamTransformationApp"); }

public static void splitSelectFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction());

SplitStream splits = data.split(value -> { List list = new ArrayList<>(); if (value % 2 == 0) { list.add("偶数"); } else { list.add("奇数"); } return list; }); splits.select("奇数").print().setParallelism(1); }

public static void unionFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data1 = env.addSource(new CustomNonParallelSourceFunction());

DataStreamSource data2 = env.addSource(new CustomNonParallelSourceFunction()); data1.union(data2).print().setParallelism(1); }

public static void filterFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction());

data.map(x -> { System.out.println("接收到: " + x); return x; }).filter(x -> x % 2 == 0).print().setParallelism(1); } }

运行结果(省略日志)

1
3
5
7
9
11
.
.

Scala代码

import com.guanjian.flink.scala.until.CustomNonParallelSourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.collector.selector.OutputSelector import java.util.ArrayList import java.lang.Iterable

object DataStreamTransformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // filterFunction(env) // unionFunction(env) splitSelectFunction(env) env.execute("DataStreamTransformationApp") }

def splitSelectFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) val splits = data.split(new OutputSelector[Long] { override def select(value: Long): Iterable[String] = { val list = new ArrayList[String] if (value % 2 == 0) { list.add("偶数") } else { list.add("奇数") } list } }) splits.select("奇数").print().setParallelism(1) }

def unionFunction(env: StreamExecutionEnvironment): Unit = { val data1 = env.addSource(new CustomNonParallelSourceFunction) val data2 = env.addSource(new CustomNonParallelSourceFunction) data1.union(data2).print().setParallelism(1) }

def filterFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) data.map(x => { println("接收到: " + x) x }).filter(_ % 2 == 0).print().setParallelism(1) } }

运行结果(省略日志)

1
3
5
7
9
11
.
.

这里需要说明的是split已经被设置为不推荐使用的方法

@deprecated def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))

因为OutputSelector函数式接口的返回类型为一个Java专属类型,对于Scala是不友好的,所以Scala这里也无法使用lambda表达式

当然select也可以选取多个流

public class DataStreamTransformationApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // filterFunction(env); // unionFunction(env); splitSelectFunction(env); env.execute("DataStreamTransformationApp"); }

public static void splitSelectFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction());

SplitStream splits = data.split(value -> { List list = new ArrayList<>(); if (value % 2 == 0) { list.add("偶数"); } else { list.add("奇数"); } return list; }); splits.select("奇数","偶数").print().setParallelism(1); }

public static void unionFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data1 = env.addSource(new CustomNonParallelSourceFunction());

DataStreamSource data2 = env.addSource(new CustomNonParallelSourceFunction()); data1.union(data2).print().setParallelism(1); }

public static void filterFunction(StreamExecutionEnvironment env) {
    DataStreamSource<Long> data = env.addSource(new CustomNonParallelSourceFunction());

data.map(x -> { System.out.println("接收到: " + x); return x; }).filter(x -> x % 2 == 0).print().setParallelism(1); } }

运行结果(省略日志)

1
2
3
4
5
6
.
.

Scala代码修改是一样的,这里就不重复了

流Sink

自定义Sink

将socket中的数据传入mysql中

@Data @ToString @AllArgsConstructor @NoArgsConstructor public class Student { private int id; private String name; private int age; }

public class SinkToMySQL extends RichSinkFunction { private Connection connection; private PreparedStatement pstmt; private Connection getConnection() { Connection conn = null; try { Class.forName("com.mysql.cj.jdbc.Driver"); String url = "jdbc:mysql://127.0.0.1:3306/flink"; conn = DriverManager.getConnection(url,"root","abcd123"); }catch (Exception e) { e.printStackTrace(); } return conn; }

@Override

public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into student(id,name,age) values (?,?,?)"; pstmt = connection.prepareStatement(sql); }

@Override

public void invoke(Student value) throws Exception { System.out.println("invoke--------"); pstmt.setInt(1,value.getId()); pstmt.setString(2,value.getName()); pstmt.setInt(3,value.getAge()); pstmt.executeUpdate(); }

@Override

public void close() throws Exception { super.close(); if (pstmt != null) { pstmt.close(); } if (connection != null) { connection.close(); } } }

public class CustomSinkToMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = env.socketTextStream("127.0.0.1", 9999); SingleOutputStreamOperator studentStream = source.map(value -> { String[] splits = value.split(","); Student stu = new Student(Integer.parseInt(splits[0]), splits[1], Integer.parseInt(splits[2])); return stu; }).returns(Student.class); studentStream.addSink(new SinkToMySQL()); env.execute("CustomSinkToMySQL"); } }

代码执行前执行

nc -lk 9999

执行代码后输入

Flink技术整理

执行结果

Flink技术整理

Scala代码

class Student(var id: Int,var name: String,var age: Int) { }

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.guanjian.flink.scala.test.Student import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction

class SinkToMySQL extends RichSinkFunction[Student] { var connection: Connection = **null ** var pstmt: PreparedStatement = null def getConnection:Connection = { var conn: Connection = **null ** Class.forName("com.mysql.cj.jdbc.Driver") val url = "jdbc:mysql://127.0.0.1:3306/flink" conn = DriverManager.getConnection(url, "root", "abcd123") conn }

override def open(parameters: Configuration): Unit = { connection = getConnection val sql = "insert into student(id,name,age) values (?,?,?)" pstmt = connection.prepareStatement(sql) }

override def invoke(value: Student): Unit = { println("invoke--------") pstmt.setInt(1,value.id) pstmt.setString(2,value.name) pstmt.setInt(3,value.age) pstmt.executeUpdate() }

override def close(): Unit = { if (pstmt != null) { pstmt.close() } if (connection != null) { connection.close() } } }

import com.guanjian.flink.scala.until.SinkToMySQL import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._

object CustomSinkToMySQL { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ val source = env.socketTextStream("127.0.0.1",9999) val studentStream = source.map(value => { val splits = value.split(",") val stu = new Student(splits(0).toInt, splits(1), splits(2).toInt) stu }) studentStream.addSink(new SinkToMySQL) env.execute("CustomSinkToMySQL") } }

控制台输入

Flink技术整理

运行结果

Flink技术整理

Table API以及SQL

要使用flink的Table API,Java工程需要添加Scala依赖库

org.apache.flink flink-scala\_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala\_${scala.binary.version} ${flink.version} org.scala-lang scala-library ${scala.version} org.apache.flink flink-table\_2.11 ${flink.version}

在data目录下添加一个sales.csv文件,文件内容如下

transactionId,customerId,itemId,amountPaid
111,1,1,100.0
112,2,3,505.0
113,3,3,510.0
114,4,4,600.0
115,1,2,500.0
116,1,2,500.0
117,1,2,500.0
118,1,2,600.0
119,2,3,400.0
120,1,2,500.0
121,1,4,500.0
122,1,2,500.0
123,1,4,500.0
124,1,2,600.0

import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.types.Row; public class TableSQLAPI { @Data @ToString @AllArgsConstructor @NoArgsConstructor public static class SalesLog { private String transactionId; private String customerId; private String itemId; private Double amountPaid; }

public static void main(String\[\] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment._getExecutionEnvironment_();

BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env); String filePath = "/Users/admin/Downloads/flink/data/sales.csv"; DataSource csv = env.readCsvFile(filePath).ignoreFirstLine() .pojoType(SalesLog.class, "transactionId", "customerId", "itemId", "amountPaid"); Table sales = tableEnv.fromDataSet(csv); tableEnv.registerTable("sales",sales); Table resultTable = tableEnv.sqlQuery("select customerId,sum(amountPaid) money from sales " + "group by customerId"); DataSet result = tableEnv.toDataSet(resultTable, Row.class); result.print(); } }

运行结果(省略日志)

3,510.0
4,600.0
1,4800.0
2,905.0

Scala代码

Scala项目同样要放入依赖

org.apache.flink flink-table\_2.11 ${flink.version}

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.TableEnvironment import org.apache.flink.types.Row import org.apache.flink.api.scala._

object TableSQLAPI { case class SalesLog(transactionId: String,customerId: String,itemId: String,amountPaid: Double)

def main(args: Array[String]): Unit = { val env = ExecutionEnvironment._getExecutionEnvironment _ val tableEnv = TableEnvironment.getTableEnvironment(env) val filePath = "/Users/admin/Downloads/flink/data/sales.csv" val csv = env.readCsvFile[SalesLog](filePath,ignoreFirstLine = **true**,includedFields = _Array_(0,1,2,3)) val sales = tableEnv.fromDataSet(csv) tableEnv.registerTable("sales",sales) val resultTable = tableEnv.sqlQuery("select customerId,sum(amountPaid) money from sales " + "group by customerId") tableEnv.toDataSet[Row](resultTable).print() } }

运行结果(省略日志)

3,510.0
4,600.0
1,4800.0
2,905.0

时间和窗口

Flink中有三个时间是比较重要的,包括事件时间(Event Time),处理时间(Processing Time),进入Flink系统的时间(Ingestion Time)

通常我们都是使用事件时间来作为基准。

Flink技术整理

设置时间的代码

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

事件时间通常以时间戳的形式会包含在传入的数据中的一个字段,通过提取,来决定窗口什么时候来执行。

窗口(Windows)是主要进行流处理(无限流)中,将流数据拆成按照时间段或者大小的一个个的数据桶,窗口分为两种,一种是根据key来统计,一种是全部的。它的处理过程如下

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

窗口触发可以有两种条件,比方说达到了一定的数量或者水印(watermark)达到了条件。watermark是一种衡量Event Time进展的机制,

watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

我们从socket接收数据,然后经过map后立刻抽取timetamp并生成watermark,之后应用window来看看watermark和event time如何变化,才导致window被触发的。

public class WindowsApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource input = env.socketTextStream("127.0.0.1", 9999); //将数据流(key,时间戳组成的字符串)转换成元组 SingleOutputStreamOperator<Tuple2<String, Long>> inputMap = input.map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] splits = value.split(","); return new Tuple2<>(splits[0], Long.parseLong(splits[1])); } }); //提取时间戳,生成水印 SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = inputMap.setParallelism(1).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() { private Long currentMaxTimestamp = 0L; //最大允许的乱序时间为10秒 private Long maxOutOfOrderness = 10000L; private Watermark watermark; private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Nullable @Override public Watermark getCurrentWatermark() { watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness); return watermark; }

        @Override

public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { Long timestamp = element.getField(1); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); System.out.println("timestamp:" + element.getField(0) + "," + element.getField(1) + "|" + format.format(new Date((Long)element.getField(1))) + "," + currentMaxTimestamp + "|" + format.format(new Date(currentMaxTimestamp)) + "," + watermark.toString() + "|" + format.format(new Date(watermark.getTimestamp()))); return timestamp; } }); //根据水印的条件,来执行我们需要的方法 //如果水印条件不满足,该方法是永远不会执行的 watermarks.keyBy(x -> (String)x.getField(0)).timeWindow(Time.seconds(3)) .apply(new WindowFunction<Tuple2<String,Long>, Tuple6<String,Integer,String,String,String,String>, String, TimeWindow>() {

                @Override

public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple6<String, Integer, String, String, String, String>> out) throws Exception { List<Tuple2<String,Long>> list = (List) input; //将乱序进行有序整理 List collect = list.stream().map(x -> (Long)x.getField(1)).sorted().collect(Collectors.toList()); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); out.collect(new Tuple6<>(key,list.size(), format.format(collect.get(0)),format.format(collect.get(collect.size() - 1)), format.format(window.getStart()),format.format(window.getEnd()))); } }).print().setParallelism(1);

env.execute("WindowsApp"); } }

在控制台执行nc -lk 9999后,运行我们的程序,控制台输入

000001,1461756862000

打印

timestamp:000001,1461756862000|2016-04-27 19:34:22.000,1461756862000|2016-04-27 19:34:22.000,Watermark @ -10000|1970-01-01 07:59:50.000

由该执行结果watermark = -10000,我们可以看出,水印是先获取的,再执行时间戳的提取。

控制台继续输入

000001,1461756866000

打印

timestamp:000001,1461756866000|2016-04-27 19:34:26.000,1461756866000|2016-04-27 19:34:26.000,Watermark @ 1461756852000|2016-04-27 19:34:12.000

由于水印是先获取的,则此时的水印1461756852000|2016-04-27 19:34:12.000是第一次输入所产生的。

控制台继续输入

000001,1461756872000

打印

timestamp:000001,1461756872000|2016-04-27 19:34:32.000,1461756872000|2016-04-27 19:34:32.000,Watermark @ 1461756856000|2016-04-27 19:34:16.000

此时我们的时间戳来到了32秒,比第一个数据的时间多出了10秒。

控制台继续输入

000001,1461756873000

打印

timestamp:000001,1461756873000|2016-04-27 19:34:33.000,1461756873000|2016-04-27 19:34:33.000,Watermark @ 1461756862000|2016-04-27 19:34:22.000

此时我们的时间戳来到了33秒,比第一个数据的时间多出了11秒。此时依然没有触发Windows窗体执行代码。

控制台继续输入

000001,1461756874000

打印

timestamp:000001,1461756874000|2016-04-27 19:34:34.000,1461756874000|2016-04-27 19:34:34.000,Watermark @ 1461756863000|2016-04-27 19:34:23.000
(000001,1,2016-04-27 19:34:22.000,2016-04-27 19:34:22.000,2016-04-27 19:34:21.000,2016-04-27 19:34:24.000)

此时触发了Windows窗体执行代码。输出了一个六元组

控制台继续输入

000001,1461756876000

打印

timestamp:000001,1461756876000|2016-04-27 19:34:36.000,1461756876000|2016-04-27 19:34:36.000,Watermark @ 1461756864000|2016-04-27 19:34:24.000

此时我们可以看到该水印是上一条数据会产生的,刚好在上一条数据的时间窗口内2016-04-27 19:34:22.000,2016-04-27 19:34:21.000,2016-04-27 19:34:24.000,触发Windows执行代码

则触发条件为

  1. watermark时间 >= window_end_time
  2. 在[window_start_time,window_end_time)中有数据存在

Scala代码

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import org.apache.flink.api.scala._

object WindowsApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val input = env.socketTextStream("127.0.0.1",9999) val inputMap = input.map(f => { val splits = f.split(",") (splits(0), splits(1).toLong) }) val watermarks = inputMap.setParallelism(1).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] { var currentMaxTimestamp = 0L var maxOutofOrderness = 10000L var watermark: Watermark = **null ** val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

  **override def** getCurrentWatermark: Watermark = {
    _watermark_ \= **new** Watermark(_currentMaxTimestamp_ \- _maxOutofOrderness_)
    _watermark

_ }

  **override def** extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long \= {
    **val** timestamp = element.\_2
    _currentMaxTimestamp_ \= Math._max_(timestamp, _currentMaxTimestamp_)
    _println_("timestamp:" \+ element.\_1 + "," \+ element.\_2 + "|" \+ _format_.format(element.\_2) +
      "," \+ _currentMaxTimestamp_ \+ "|" \+ _format_.format(_currentMaxTimestamp_) +
      "," \+ _watermark_.toString + "|" \+ _format_.format(_watermark_.getTimestamp))
    timestamp
  }
})
watermarks.keyBy(\_.\_1).timeWindow(Time._seconds_(3))
  .apply(**new** WindowFunction\[(String,Long),(String,Int,String,String,String,String),String,TimeWindow\] {
    **override def** apply(key: String, window: TimeWindow, input: Iterable\[(String, Long)\], out: Collector\[(String, Int, String, String, String, String)\]): Unit \= {
      **val** list = input.toList.sortBy(\_.\_2)
      **val** format = **new** SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      out.collect((key,input.size,format.format(list.head.\_2),format.format(list.last.\_2),format.format(window.getStart),format.format(window.getEnd)))
    }
  }).print().setParallelism(1)
env.execute("WindowsApp")

} }

滚动窗口和滑动窗口

滚动窗口就是一个不重叠的时间分片,落入到该时间分片的数据都会被该窗口计算。上面的例子就是一个滚动窗口

Flink技术整理

代码中可以写成

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

也可以简写成

.timeWindow(Time.seconds(5))

滑动窗口是一个可以重叠的时间分片,同样的数据可以落入不同的窗口,不同的窗口都会计算落入自己时间分片的数据。

Flink技术整理

代码可以写成

.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

或者简写成

.timeWindow(Time.seconds(10),Time.seconds(5))

public class SliderWindowsApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource text = env.socketTextStream("127.0.0.1", 9999); text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] splits = value.split(","); Stream.of(splits).forEach(token -> out.collect(new Tuple2<>(token,1))); } }).keyBy(0).timeWindow(Time.seconds(10),Time.seconds(5)) .sum(1).print().setParallelism(1); env.execute("SliderWindowsApp"); } }

控制台输入

a,b,c,d,e,f
a,b,c,d,e,f
a,b,c,d,e,f

运行结果

(d,3)
(a,3)
(e,3)
(f,3)
(c,3)
(b,3)
(c,3)
(f,3)
(b,3)
(d,3)
(e,3)
(a,3)

从结果我们可以看到,数据被运算了两次

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._

object SliderWindowsApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ val text = env.socketTextStream("127.0.0.1",9999) text.flatMap(_.split(",")) .map((_,1)) .keyBy(0) .timeWindow(Time.seconds(10),Time.seconds(5)) .sum(1) .print() .setParallelism(1) env.execute("SliderWindowsApp") } }

Windows Functions

RedueFunction

这是一个增量函数,即它不会把时间窗口内的所有数据统一处理,只会一条一条处理

public class ReduceWindowsApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource text = env.socketTextStream("127.0.0.1", 9999); text.flatMap((FlatMapFunction<String,String>)(f, collector) -> { String[] splits = f.split(","); Stream.of(splits).forEach(collector::collect); }).returns(String.class) .map(new MapFunction<String, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer, Integer> map(String value) throws Exception { return new Tuple2<>(1,Integer.parseInt(value)); } }) .keyBy(0) .timeWindow(Time.seconds(5)) .reduce((x,y) -> new Tuple2<>(x.getField(0),(int)x.getField(1) + (int)y.getField(1))) .print().setParallelism(1); env.execute("ReduceWindowsApp"); } }

控制台输入

1,2,3,4,5
7,8,9

运行结果

(1,15)
(1,24)

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._

object ReduceWindowsApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ val text = env.socketTextStream("127.0.0.1",9999) text.flatMap(_.split(",")) .map(x => (1,x.toInt)) .keyBy(0) .timeWindow(Time.seconds(5)) .reduce((x,y) => (x._1,x._2 + y._2)) .print() .setParallelism(1) env.execute("ReduceWindowsApp") } }

ProcessFunction

这是一个全量函数,即它会把一个时间窗口内的所有数据一起处理

public class ProcessWindowsApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource text = env.socketTextStream("127.0.0.1", 9999); text.flatMap((FlatMapFunction<String,String>)(f, collector) -> { String[] splits = f.split(","); Stream.of(splits).forEach(collector::collect); }).returns(String.class) .map(new MapFunction<String, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer, Integer> map(String value) throws Exception { return new Tuple2<>(1,Integer.parseInt(value)); } }) .keyBy(0) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>, Tuple, TimeWindow>() { @Override public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Tuple2<Integer, Integer>> out) throws Exception { List<Tuple2<Integer,Integer>> list = (List) elements; out.collect(list.stream().reduce((x, y) -> new Tuple2<>(x.getField(0), (int) x.getField(1) + (int) y.getField(1))) .get()); } }).print().setParallelism(1); env.execute("ProcessWindowsApp"); } }

控制台输入

1,2,3,4,5
7,8,9

运行结果

(1,39)

Scala代码

import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import org.apache.flink.api.scala._

object ProcessWindowsApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ val text = env.socketTextStream("127.0.0.1",9999) text.flatMap(_.split(",")) .map(x => (1,x.toInt)) .keyBy(0) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction[(Int,Int),(Int,Int),Tuple,TimeWindow] { override def process(key: Tuple, context: Context, elements: Iterable[(Int, Int)], out: Collector[(Int, Int)]): Unit = { val list = elements.toList out.collect(list.reduce((x,y) => (x._1,x._2 + y._2))) } }) .print() .setParallelism(1) env.execute("ReduceWindowsApp") } }

Connector

Flink提供了很多内置的数据源或者输出的连接Connector,当前包括的有

Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)

HDFS Connector

这是一个把数据流输出到Hadoop HDFS分布式文件系统的连接,要使用该连接,需要添加以下依赖

org.apache.flink flink-connector-filesystem\_2.11 ${flink.version} org.apache.hadoop hadoop-client ${hadoop.version}

版本号根据自己实际情况来选择,我这里hadoop的版本号为

<hadoop.version>2.8.1</hadoop.version>

在data下新建一个hdfssink的文件夹,此时文件夹内容为空

public class FileSystemSinkApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource data = env.socketTextStream("127.0.0.1", 9999); String filePath = "/Users/admin/Downloads/flink/data/hdfssink"; BucketingSink sink = new BucketingSink<>(filePath); sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm")); sink.setWriter(new StringWriter<>()); // sink.setBatchSize(1024 * 1024 * 400); sink.setBatchRolloverInterval(20); data.addSink(sink); env.execute("FileSystemSinkApp"); } }

我这里并没有真正使用hadoop的hdfs,hdfs的搭建可以参考Hadoop hdfs+Spark配置 。而是本地目录,在控制台随便输入

adf
dsdf
wfdgg

我们可以看到在hdfssink文件夹下面多了一个

2021-01-15--0627

的文件夹,进入该文件夹后可以看到3个文件

_part-4-0.pending    _part-5-0.pending    _part-6-0.pending

查看三个文件

(base) -bash-3.2$ cat _part-4-0.pending
adf

(base) -bash-3.2$ cat _part-5-0.pending
dsdf

(base) -bash-3.2$ cat _part-6-0.pending
wfdgg

BucketingSink其实是RichSinkFunction抽象类的子类,跟之前写的自定义Sink的SinkToMySQL是一样的。

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.fs.StringWriter import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer}

object FileSystemSinkApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ val data = env.socketTextStream("127.0.0.1",9999) val filePath = "/Users/admin/Downloads/flink/data/hdfssink" val sink = new BucketingSink[String](filePath) sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm")) sink.setWriter(new StringWriter[String]()) // sink.setBatchSize(1024 * 1024 * 400) sink.setBatchRolloverInterval(20) data.addSink(sink) env.execute("FileSystemSinkApp") } }

Kafka Connector

要使用Kafka Connector,当然首先必须安装Kafka。先安装一个zookeeper 3.4.5,kafka 1.1.1

由于我的Kafka是安装在阿里云上面的,本地访问需要配置一下,在kafka的config目录下修改server.properties

advertised.listeners=PLAINTEXT://外网IP:9092
host.name=内网IP

同时阿里云需要开放9092端口

kafka启动

 ./kafka-server-start.sh -daemon ../config/server.properties 

创建topic

 ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pktest

此时我们进入zookeeper可以看到该topic

[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[pktest, __consumer_offsets]

查看topic

./kafka-topics.sh --list --zookeeper localhost:2181

该命令返回的结果为

[bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
pktest

启动生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic pktest

但由于我们是在阿里云上面启动,则启动生产者需要更改为

./kafka-console-producer.sh --broker-list 外网ip:9092 --topic pktest

启动消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pktest

但由于我们是在阿里云上面启动,则启动消费者需要更改为

./kafka-console-consumer.sh --bootstrap-server 外网ip:9092 --topic pktest

此时我们在生产者窗口输入,消费者窗口这边就会获取

Kafka作为Source代码,添加依赖

org.apache.flink flink-connector-kafka\_2.11 ${flink.version} org.apache.kafka kafka-clients 1.1.1

public class KafkaConnectorConsumerApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(4000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(10000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); String topic = "pktest"; Properties properties = new Properties(); properties.setProperty("bootstrap.servers","外网ip:9092"); properties.setProperty("group.id","test"); DataStreamSource data = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties)); data.print().setParallelism(1); env.execute("KafkaConnectorConsumerApp"); } }

运行结果

服务器输入

[bin]# ./kafka-console-producer.sh --broker-list 外网ip:9092 --topic pktest       
>sdfa

打印

sdfa

Scala代码

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.CheckpointingMode

object KafkaConnectorConsumerApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.enableCheckpointing(4000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointTimeout(10000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) val topic = "pktest" val properties = new Properties properties.setProperty("bootstrap.servers", "外网ip:9092") properties.setProperty("group.id","test") val data = env.addSource(new FlinkKafkaConsumer[String](topic, **new** SimpleStringSchema, properties)) data.print().setParallelism(1) env.execute("KafkaConnectorConsumerApp") } }

Kafka作为Sink

public class KafkaConnectorProducerApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(4000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(10000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); DataStreamSource data = env.socketTextStream("127.0.0.1", 9999); String topic = "pktest"; Properties properties = new Properties(); properties.setProperty("bootstrap.servers","外网ip:9092"); FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); data.addSink(kafkaSink); env.execute("KafkaConnectorProducerApp"); } }

控制台输入

sdfae
dffe

服务器打印

[bin]# ./kafka-console-consumer.sh --bootstrap-server 外网ip:9092 --topic pktest      
sdfae
dffe

Scala代码

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper

object KafkaConnectorProducerApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.enableCheckpointing(4000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointTimeout(10000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) val data = env.socketTextStream("127.0.0.1",9999) val topic = "pktest" val properties = new Properties properties.setProperty("bootstrap.servers", "外网ip:9092") val kafkaSink = new FlinkKafkaProducer[String](topic, **new** KeyedSerializationSchemaWrapper[String](**new** SimpleStringSchema),properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE) data.addSink(kafkaSink) env.execute("KafkaConnectorProducerApp") } }

部署

单机部署

下载地址:https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-hadoop28-scala_2.11.tgz

由于我这里用的是1.7.2(当然你可以使用其他版本),下载解压缩后,进入bin目录,执行

./start-cluster.sh

进入web界面 http://外网ip:8081/

Flink技术整理

提交一个测试用例

nc -lk 9000

退出bin目录,返回上级目录,执行

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

此时在web界面中可以看到这个运行的任务

Flink技术整理

点RUNNING按钮见到的如下

Flink技术整理

虽然我们在nc中敲入一些字符,比如

a       f       g       r       a       d
a       f       g       r       a       d
a       f       g       r       a       d

但并没有打印的地方,我们查看结果需要在log目录下查看

[log]# cat flink-root-taskexecutor-0-iZ7xvi8yoh0wspvk6rjp7cZ.out
a : 6
d : 3
r : 3
g : 3
f : 3
 : 90

上传我们自己的jar包

上传之前,修改一下我们需要运行的main方法的类

com.guanjian.flink.java.test.StreamingJavaApp

由于我们代码的端口为9999,执行

nc -lk 9999

上传后执行(上传至flink的新建test目录下)

 ./bin/flink run test/flink-train-java-1.0.jar

nc下输入

a d e g a d g f

在log下执行

cat flink-root-taskexecutor-0-iZ7xvi8yoh0wspvk6rjp7cZ.out

可以看到除了之前的记录,多出了几条新的记录

a : 6
d : 3
r : 3
g : 3
f : 3
 : 90
(a,2)
(f,1)
(g,2)
(e,1)
(d,2)

Yarn集群部署

要进行Yarn集群部署,得要先安装Hadoop,我这里Hadoop的版本为2.8.1

进入Hadoop安装目录下的etc/hadoop文件夹

首先依然是hadoop-env.sh的配置,需要配置一下JAVA_HOME

export JAVA_HOME=/home/soft/jdk1.8.0_161

core-site.xml

<configuration>
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://host1:8020</value>
</property>
</configuration>

hdfs-site.xml

<configuration>
<property>
    <name>dfs.namenode.name.dir</name>
    <value>/opt/hadoop2/tmp/dfs/name</value>
</property>
<property>
    <name>dfs.datanode.data.dir</name>
    <value>/opt/hadoop2/tmp/dfs/data</value>
</property>
</configuration>

此时需要新建这两个目录

mkdir -p /opt/hadoop2/tmp/dfs/name
mkdir -p /opt/hadoop2/tmp/dfs/data

yarn-site.xml

<configuration>
<!-- Site specific YARN configuration properties -->
<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
</property>
<property>
    <name>yarn.resourcemanager.hostname</name>
    <value>host1</value>
</property>
<property> 
    <name>yarn.nodemanager.vmem-check-enabled</name> 
    <value>false</value> 
</property> 
</configuration>

mapred-site.xml

<configuration>
<property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
</property>
</configuration>

启动后,可以看到50070的hdfs页面以及8088的Yarn页面

Flink技术整理

Flink技术整理

在进行Flink的Yarn部署前需要配置HADOOP_HOME,此处包括JAVA_HOME

vim /etc/profile

JAVA_HOME=/home/soft/jdk1.8.0_161
HADOOP_HOME=/home/soft/hadoop-2.8.1
JRE_HOME=${JAVA_HOME}/jre
CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin
PATH=${JAVA_HOME}/bin:${HADOOP_HOME}/bin:$PATH
export JAVA_HOME HADOOP_HOME PATH

保存后,source /etc/profile

第一种Yarn部署

Flink技术整理

在flink的bin目录下

./yarn-session.sh -n 1 -jm 1024m -tm 1024m

-n :    taskManager的数量

-jm:   jobManager的内存

-tm:  taskManager的内存

此时在Yarn的Web页面(8088端口)可以看到

Flink技术整理

在我们的访问机上的/etc/hosts配置好host1的IP地址后,点击ApplicationMaster进入Flink的管理页面

Flink技术整理

提交代码任务

上传一份文件到hdfs的根目录

hdfs dfs -put LICENSE-2.0.txt /

提交代码任务,在flink的bin目录下

./flink run ../examples/batch/WordCount.jar -input hdfs://host1:8020/LICENSE-2.0.txt -output hdfs://host1:8020/wordcount-result.txt

运算完成后,查看hdfs的文件

[bin]# hdfs dfs -ls /
Found 4 items
-rw-r--r--   3 root supergroup      11358 2021-01-24 14:47 /LICENSE-2.0.txt
drwxr-xr-x   - root supergroup          0 2021-01-24 09:43 /abcd
drwxr-xr-x   - root supergroup          0 2021-01-24 14:17 /user
-rw-r--r--   3 root supergroup       4499 2021-01-24 15:06 /wordcount-result.txt

在Flink的页面也可以看到

Flink技术整理

Flink技术整理

第二种Yarn部署

Flink技术整理

要进行第二种Yarn部署,我们需要先取消第一种的配置

yarn application -kill application_1611471412139_0001

在flink的bin目录下

./flink run -m yarn-cluster -yn 1 ../examples/batch/WordCount.jar

-m :    yarn集群,yarn-cluster为常量

-yn:    taskManager的数量

此时在Yarn的Web界面也可以看到

Flink技术整理

提交我们自己的任务,将代码socket的IP改成host1

public class StreamingJavaApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource text = env.socketTextStream("host1",9999); text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = value.toLowerCase().split(" "); for (String token : tokens) { collector.collect(new Tuple2<>(token,1)); } } }).keyBy(0).timeWindow(Time.seconds(5)) .sum(1).print(); env.execute("StreamingJavaApp"); } }

启动nc

nc -lk 9999

./flink run -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar

输入

[~]# nc -lk 9999              
a v d t e a d e f
a v d t e a d e f
a v d t e a d e f
a v d t e a d e f

查看结果

在Flink的Web界面上

Flink技术整理

State

  1. State是指某一个具体的Task/Operator的状态
  2. State数据默认存放在JVM中
  3. 分类:Keyed State & Operator State

Flink技术整理

Keyed State

_/** _ * 从一组数据中,每两个数据统计一次平均值 */ public class KeyedStateApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Arrays.asList(new Tuple2<>(1,3), new Tuple2<>(1,5), new Tuple2<>(1,7), new Tuple2<>(1,4), new Tuple2<>(1,2))) .keyBy(ele -> ele.getField(0)) .flatMap(new RichFlatMapFunction<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() { private transient ValueState<Tuple2<Integer,Integer>> state; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); state = getRuntimeContext().getState(new ValueStateDescriptor<>("avg", TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() { }))); }

                @Override

public void flatMap(Tuple2<Integer, Integer> value, Collector<Tuple2<Integer, Integer>> out) throws Exception { Tuple2<Integer, Integer> tmpState = state.value(); Tuple2<Integer,Integer> currentState = tmpState == null ? Tuple2.of(0,0) : tmpState; Tuple2<Integer,Integer> newState = new Tuple2<>((int) currentState.getField(0) + 1,(int) currentState.getField(1) + (int) value.getField(1)); state.update(newState); if ((int) newState.getField(0) >= 2) { out.collect(new Tuple2<>(value.getField(0),(int) newState.getField(1) / (int) newState.getField(0))); state.clear(); } } }).print().setParallelism(1); env.execute("KeyedStateApp"); } }

运行结果

(1,4)
(1,5)

Scala代码

import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector import org.apache.flink.api.scala.createTypeInformation object KeyedStateApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2))) .keyBy(_._1) .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int)] { var state: ValueState[(Int,Int)] = _

    **override def** open(parameters: Configuration): Unit \= {
      _state_ \= getRuntimeContext.getState(**new** ValueStateDescriptor\[(Int, Int)\]("avg",

_createTypeInformation_[(Int,Int)])) }

    **override def** flatMap(value: (Int, Int), out: Collector\[(Int, Int)\]) = {
      **val** tmpState = _state_.value()
      **val** currentState = **if** (tmpState != **null**) {
        tmpState
      } **else** {
        (0,0)
      }
      **val** newState = (currentState.\_1 + 1,currentState.\_2 + value.\_2)
      _state_.update(newState)
      **if** (newState.\_1 >= 2) {
        out.collect((value.\_1,newState.\_2 / newState.\_1))
        _state_.clear()
      }
    }
  }).print().setParallelism(1)
env.execute("KeyedStateApp")

} }

运行结果

(1,4)
(1,5)

Reducing State 

_/** _ * 统计数据条数,并加总 */ public class ReducingStateApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Arrays.asList(new Tuple2<>(1,3), new Tuple2<>(1,5), new Tuple2<>(1,7), new Tuple2<>(1,4), new Tuple2<>(1,2))) .keyBy(ele -> ele.getField(0)) .flatMap(new RichFlatMapFunction<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() { private transient ReducingState<Tuple2<Integer,Integer>> state; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); state = getRuntimeContext().getReducingState(new ReducingStateDescriptor<>("sum", new ReduceFunction<Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception { Tuple2<Integer,Integer> tuple2 = new Tuple2<>((int) value1.getField(0) + 1, (int) value1.getField(1) + (int) value2.getField(1)); return tuple2; } }, TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}))); }

                @Override

public void flatMap(Tuple2<Integer, Integer> value, Collector<Tuple2<Integer, Integer>> out) throws Exception { Tuple2<Integer,Integer> tuple2 = new Tuple2<>(value.getField(0), value.getField(1)); state.add(tuple2); out.collect(new Tuple2<>(state.get().getField(0),state.get().getField(1))); } }).print().setParallelism(1); env.execute("ReducingStateApp"); } }

运行结果

(2,8)
(3,15)
(4,19)
(5,21)

Scala代码

import org.apache.flink.api.common.functions.{ReduceFunction, RichFlatMapFunction} import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector

object ReducingStateApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2))) .keyBy(_._1) .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int)] { var state: ReducingState[(Int,Int)] = _

    **override def** open(parameters: Configuration): Unit \= {
      _state_ \= getRuntimeContext.getReducingState(**new** ReducingStateDescriptor\[(Int, Int)\]("sum",

new ReduceFunction[(Int, Int)] { override def reduce(value1: (Int, Int), value2: (Int, Int)): (Int, Int) = { (value1._1 + 1,value1._2 + value2._2) } }, _createTypeInformation_[(Int,Int)])) }

    **override def** flatMap(value: (Int, Int), out: Collector\[(Int, Int)\]) = {
      **val** tuple2 = (value.\_1,value.\_2)
      _state_.add(tuple2)
      out.collect((_state_.get().\_1,_state_.get().\_2))
    }
  }).print().setParallelism(1)
env.execute("ReducingStateApp")

} }

运行结果

(2,8)
(3,15)
(4,19)
(5,21)

 List State

_/** _ * 获取每一条所在的位置 */ public class ListStateApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Arrays.asList(new Tuple2<>(1,3), new Tuple2<>(1,5), new Tuple2<>(1,7), new Tuple2<>(1,4), new Tuple2<>(1,2))) .keyBy(ele -> ele.getField(0)) .flatMap(new RichFlatMapFunction<Tuple2<Integer,Integer>, Tuple3<Integer,Integer,Integer>>() { private transient ListState state; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); state = getRuntimeContext().getListState(new ListStateDescriptor<>("list", TypeInformation.of(new TypeHint() {}))); }

                @Override

public void flatMap(Tuple2<Integer, Integer> value, Collector<Tuple3<Integer, Integer,Integer>> out) throws Exception { state.add(value.getField(0)); Iterator iterator = state.get().iterator(); Integer l = 0; while (iterator.hasNext()) { l += iterator.next(); } Tuple3<Integer,Integer,Integer> tuple3 = new Tuple3<>(value.getField(0),value.getField(1),l); out.collect(tuple3); } }).print().setParallelism(1); env.execute("ListStateApp"); } }

运行结果

(1,3,1)
(1,5,2)
(1,7,3)
(1,4,4)
(1,2,5)

Scala代码

import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector

object ListStateApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2))) .keyBy(_._1) .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int,Int)] { var state: ListState[Int] = _

    **override def** open(parameters: Configuration): Unit \= {
      _state_ \= getRuntimeContext.getListState(**new** ListStateDescriptor\[Int\]("list",

_createTypeInformation_[Int])); }

    **override def** flatMap(value: (Int, Int), out: Collector\[(Int, Int, Int)\]) = {
      _state_.add(value.\_1)
      **val** iterator = _state_.get().iterator()
      **var** l: Int \= 0

while (iterator.hasNext) { l += iterator.next() } val tuple3 = (value._1,value._2,l) out.collect(tuple3) } }).print().setParallelism(1) env.execute("ListStateApp") } }

运行结果

(1,3,1)
(1,5,2)
(1,7,3)
(1,4,4)
(1,2,5)

Fold State 

_/** _ * 从某个初始值开始统计条数 */ public class FoldStateApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Arrays.asList(new Tuple2<>(1,3), new Tuple2<>(1,5), new Tuple2<>(1,7), new Tuple2<>(1,4), new Tuple2<>(1,2))) .keyBy(ele -> ele.getField(0)) .flatMap(new RichFlatMapFunction<Tuple2<Integer,Integer>, Tuple3<Integer,Integer,Integer>>() { private transient FoldingState<Integer,Integer> state; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); state = getRuntimeContext().getFoldingState(new FoldingStateDescriptor<Integer, Integer>("fold", 1, (accumulator, value) -> accumulator + value, TypeInformation.of(new TypeHint() {}) )); }

                @Override

public void flatMap(Tuple2<Integer, Integer> value, Collector<Tuple3<Integer, Integer,Integer>> out) throws Exception { state.add(value.getField(0)); out.collect(new Tuple3<>(value.getField(0),value.getField(1),state.get())); } }).print().setParallelism(1); env.execute("FoldStateApp"); } }

运行结果

(1,3,2)
(1,5,3)
(1,7,4)
(1,4,5)
(1,2,6)

Scala代码

import org.apache.flink.api.common.functions.{FoldFunction, RichFlatMapFunction} import org.apache.flink.api.common.state.{FoldingState, FoldingStateDescriptor} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector

object FoldStateApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2))) .keyBy(_._1) .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int,Int)] { var state: FoldingState[Int,Int] = _

    **override def** open(parameters: Configuration): Unit \= {
      _state_ \= getRuntimeContext.getFoldingState(**new** FoldingStateDescriptor\[Int,Int\]("fold",

1,new FoldFunction[Int,Int] { override def fold(accumulator: Int, value: Int) = { accumulator + value } }, _createTypeInformation_[Int])) }

    **override def** flatMap(value: (Int, Int), out: Collector\[(Int, Int, Int)\]) = {
      _state_.add(value.\_1)
      out.collect((value.\_1,value.\_2,_state_.get()))
    }
  }).print().setParallelism(1)
env.execute("FoldStateApp")

} }

运行结果

(1,3,2)
(1,5,3)
(1,7,4)
(1,4,5)
(1,2,6)

 Map State

_/** _ * 将每一条的数据加上上一条的数据,第一条保持自身 */ public class MapStateApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Arrays.asList(new Tuple2<>(1,3), new Tuple2<>(1,5), new Tuple2<>(1,7), new Tuple2<>(1,4), new Tuple2<>(1,2))) .keyBy(ele -> ele.getField(0)) .flatMap(new RichFlatMapFunction<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() { private transient MapState<Integer,Integer> state; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); state = getRuntimeContext().getMapState(new MapStateDescriptor<>("map", TypeInformation.of(new TypeHint() {}), TypeInformation.of(new TypeHint() {}))); }

                @Override

public void flatMap(Tuple2<Integer, Integer> value, Collector<Tuple2<Integer, Integer>> out) throws Exception { Integer tmp = state.get(value.getField(0)); Integer current = tmp == null ? 0 : tmp; state.put(value.getField(0),value.getField(1)); Tuple2<Integer,Integer> tuple2 = new Tuple2<>(value.getField(0), current + (int) value.getField(1)); out.collect(tuple2); } }).print().setParallelism(1); env.execute("MapStateApp"); } }

运行结果

(1,3)
(1,8)
(1,12)
(1,11)
(1,6)

Scala代码

import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector import org.apache.flink.api.scala.createTypeInformation object MapStateApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2))) .keyBy(_._1) .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int)] { var state: MapState[Int,Int] = _

    **override def** open(parameters: Configuration): Unit \= {
      _state_ \= getRuntimeContext.getMapState(**new** MapStateDescriptor\[Int,Int\]("map",

_createTypeInformation_[Int],_createTypeInformation_[Int])) }

    **override def** flatMap(value: (Int, Int), out: Collector\[(Int, Int)\]) = {
      **val** tmp: Int \= _state_.get(value.\_1)
      **val** current: Int \= **if** (tmp == **null**) {
        0

} else { tmp } state.put(value._1,value._2) val tuple2 = (value._1,current + value._2) out.collect(tuple2) } }).print().setParallelism(1) env.execute("MapStateApp") } }

运行结果

(1,3)
(1,8)
(1,12)
(1,11)
(1,6)

 Aggregating State

_/** _ * 求每一条数据跟之前所有数据的平均值 */ public class AggregatingStateApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Arrays.asList(new Tuple2<>(1,3), new Tuple2<>(1,5), new Tuple2<>(1,7), new Tuple2<>(1,4), new Tuple2<>(1,2))) .keyBy(ele -> ele.getField(0)) .flatMap(new RichFlatMapFunction<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() { private transient AggregatingState<Integer,Integer> state; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); state = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("agg", new AggregateFunction<Integer, Tuple2<Integer,Integer>, Integer>() { @Override public Tuple2<Integer, Integer> createAccumulator() { return new Tuple2<>(0,0); }

                                @Override

public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) { return new Tuple2<>((int) accumulator.getField(0) + value, (int) accumulator.getField(1) + 1); }

                                @Override

public Integer getResult(Tuple2<Integer, Integer> accumulator) { return (int) accumulator.getField(0) / (int) accumulator.getField(1); }

                                @Override

public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { return new Tuple2<>((int) a.getField(0) + (int) b.getField(0), (int) a.getField(1) + (int) b.getField(1)); } }, TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>() {}))); }

                @Override

public void flatMap(Tuple2<Integer, Integer> value, Collector<Tuple2<Integer, Integer>> out) throws Exception { state.add(value.getField(1)); Tuple2<Integer,Integer> tuple2 = new Tuple2<>(value.getField(0), state.get()); out.collect(tuple2); } }).print().setParallelism(1); env.execute("AggregatingStateApp"); } }

运行结果

(1,3)
(1,4)
(1,5)
(1,4)
(1,4)

Scala代码

import org.apache.flink.api.common.functions.{AggregateFunction, RichFlatMapFunction} import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector

object AggregatingStateApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2))) .keyBy(_._1) .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int)] { var state: AggregatingState[Int,Int] = _

    **override def** open(parameters: Configuration): Unit \= {
      _state_ \= getRuntimeContext.getAggregatingState(**new** AggregatingStateDescriptor\[Int,(Int,Int),Int\]("agg",

new AggregateFunction[Int,(Int,Int),Int] { override def add(value: Int, accumulator: (Int, Int)) = { (accumulator._1 + value,accumulator._2 + 1) }

          **override def** createAccumulator() = {
            (0,0)
          }

          **override def** getResult(accumulator: (Int, Int)) = {
            accumulator.\_1 / accumulator.\_2
          }

          **override def** merge(a: (Int, Int), b: (Int, Int)) = {
            (a.\_1 + b.\_1,a.\_2 + b.\_2)
          }
        },_createTypeInformation_\[(Int,Int)\]))
    }

    **override def** flatMap(value: (Int, Int), out: Collector\[(Int, Int)\]) = {
      _state_.add(value.\_2)
      **val** tuple2 = (value.\_1,_state_.get())
      out.collect(tuple2)
    }
  }).print().setParallelism(1)
env.execute("AggregatingStateApp")

} }

运行结果

(1,3)
(1,4)
(1,5)
(1,4)
(1,4)

Checkpoint机制

Flink中的每一个算子都能成为有状态的,为了使得状态能够容错,持久化状态,就有了Checkpoint机制。Checkpoint能够恢复状态以及在流中消费的位置,提供一种无故障执行的方式。

默认情况下,checkpoint机制是禁用的,需要我们手动开启。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //开启Checkpoint,间隔时间4秒进行一次Checkpoint env.enableCheckpointing(4000); //设置Checkpoint的模式,精准一次,也是Checkpoint默认的方式,适合大部分应用, //还有一种CheckpointingMode.AT_LEAST_ONCE最少一次,一般用于超低延迟的场景 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置Checkpoint的超时时间,这里是10秒 env.getCheckpointConfig().setCheckpointTimeout(10000); //设置Checkpoint的并发数,可以1个,可以多个 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

public class CheckpointApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); DataStreamSource stream = env.socketTextStream("127.0.0.1", 9999); stream.map(x -> { if (x.contains("pk")) { throw new RuntimeException("出bug了..."); }else { return x; } }).print().setParallelism(1); env.execute("CheckpointApp"); } }

按照一般的情况,如果我们没有开启nc -lk 9999,则程序会直接挂掉,但是我们这里开启了Checkpoint,此时虽然9999端口没有开启,但它会一直试图连接9999端口,并不会挂掉,而Checkpoint的重试次数为Integer.MAX_VALUE,所以我们会一直看到这样的日志

java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._

object CheckpointApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.enableCheckpointing(5000) val stream = env.socketTextStream("127.0.0.1",9999) stream.map(x => { if (x.contains("pk")) { throw new RuntimeException("出bug了...") } else { x } }).print().setParallelism(1) env.execute("CheckpointApp") } }

重启策略

就像我们刚才看到的,如果不设置重启策略,则Checkpoint会有一个默认的重启策略,次数为Integer.MAX_VALUE,延迟为1秒。如果我们只想重启两次,就需要设置重启策略,重启策略的设置可以在Flink的配置文件中设置,也可以在代码中设置

如在flink的conf目录下编辑flink-conf.yaml

restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 5 s

失败后重启次数2,延迟时间间隔5秒

代码中设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS)));

这里需要注意的是使用重启策略,必须开启Checkpoint机制,否则无效

public class CheckpointApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))); DataStreamSource stream = env.socketTextStream("127.0.0.1", 9999); stream.map(x -> { if (x.contains("pk")) { throw new RuntimeException("出bug了..."); }else { return x; } }).print().setParallelism(1); env.execute("CheckpointApp"); } }

当我们打开nc -lk 9999,再运行该程序,当我们在控制台输出2次pk,程序虽然会抛出异常

java.lang.RuntimeException: 出bug了...
    at com.guanjian.flink.java.test.CheckpointApp.lambda$main$95f17bfa$1(CheckpointApp.java:18)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

但不会挂掉,当我们输入第三次pk的时候,程序就会彻底挂掉

Scala代码

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._

object CheckpointApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.enableCheckpointing(5000) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))) val stream = env.socketTextStream("127.0.0.1",9999) stream.map(x => { if (x.contains("pk")) { throw new RuntimeException("出bug了...") } else { x } }).print().setParallelism(1) env.execute("CheckpointApp") } }

StateBackend

默认情况下,Checkpoint的State是存储在内存中,一旦我们的程序挂掉了,重新启动,那么之前的状态都会丢失,比方说之前我们在nc中输入了

a,a,a

以之前的CheckpointApp来说,我们稍作修改

public class CheckpointApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))); DataStreamSource stream = env.socketTextStream("127.0.0.1", 9999); stream.map(x -> { if (x.contains("pk")) { throw new RuntimeException("出bug了..."); }else { return x; } }).flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splits = value.split(","); Stream.of(splits).forEach(token -> out.collect(new Tuple2<>(token,1))); } }).keyBy(0).sum(1) .print().setParallelism(1); env.execute("CheckpointApp"); } }

运行结果为

(a,1)
(a,2)
(a,3)

这个是没有问题的,现在一旦程序挂掉,再次启动程序的时候,我们再做相同的处理,结果不变。

但如果我们并不希望这样的结果,我们希望得到的结果是

(a,4)
(a,5)
(a,6)

 保留之前挂掉前的结果继续累加

public class CheckpointApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); //非内存的外部扩展 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //State以文件方式存储 env.setStateBackend(new FsStateBackend("hdfs://172.18.114.236:8020/backend")); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))); DataStreamSource stream = env.socketTextStream("host1", 9999); stream.map(x -> { if (x.contains("pk")) { throw new RuntimeException("出bug了..."); }else { return x; } }).flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splits = value.split(","); Stream.of(splits).forEach(token -> out.collect(new Tuple2<>(token,1))); } }).keyBy(0).sum(1) .print().setParallelism(1); env.execute("CheckpointApp"); } }

pom中调整运行的主类

<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> com.guanjian.flink.java.test.CheckpointApp

打包上传服务器flink的test的目录

修改flink的conf目录下的flink-conf.yaml,补充以下内容

state.backend: filesystem
state.checkpoints.dir: hdfs://172.18.114.236:8020/backend
state.savepoints.dir: hdfs://172.18.114.236:8020/backend

在HDFS中新建backend目录

hdfs dfs -mkdir /backend

重启Flink,开启

nc -lk 9999

第一次提交方式不变

./flink run -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar

继续之前的输入

a,a,a

此时停掉flink提交的程序,会在hdfs中发现一个很多数字的文件夹

Flink技术整理

现在我们再次启动程序,不过跟之前有些不同

./flink run -s hdfs://172.18.114.236:8020/backend/4db93b564e17b3806230f7c2d053121e/chk-5 -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar 

此时在nc中继续输入

a,a,a

运行结果就达到了我们的预期

Flink技术整理

Scala代码

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig

object CheckpointApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.enableCheckpointing(5000) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend(new FsStateBackend("hdfs://172.18.114.236:8020/backend")) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))) val stream = env.socketTextStream("host1",9999) stream.map(x => { if (x.contains("pk")) { throw new RuntimeException("出bug了...") } else { x } }).flatMap(_.split(",")) .map((_,1)) .keyBy(0) .sum(1) .print().setParallelism(1) env.execute("CheckpointApp") } }

RocksDBStateBackend

要使用RocksDBBackend需要先添加依赖

org.apache.flink flink-statebackend-rocksdb\_2.11 ${flink.version}

public class CheckpointApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); //非内存的外部扩展 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //State以RockDB数据库存储,并刷到hdfs上面去 env.setStateBackend(new RocksDBStateBackend("hdfs://172.18.114.236:8020/backend/rocksDB",true)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))); DataStreamSource stream = env.socketTextStream("host1", 9999); stream.map(x -> { if (x.contains("pk")) { throw new RuntimeException("出bug了..."); }else { return x; } }).flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splits = value.split(","); Stream.of(splits).forEach(token -> out.collect(new Tuple2<>(token,1))); } }).keyBy(0).sum(1) .print().setParallelism(1); env.execute("CheckpointApp"); } }

打包上传服务器flink的test目录下

创建hdfs的目录

hdfs dfs -mkdir /backend/rocksDB

配置flink的flink-conf.yaml,修改和添加以下内容

state.backend: rocksdb
state.checkpoints.dir: hdfs://172.18.114.236:8020/backend/rocksDB
state.savepoints.dir: hdfs://172.18.114.236:8020/backend/rocksDB
state.backend.incremental: true
state.backend.rocksdb.checkpoint.transfer.thread.num: 1
state.backend.rocksdb.localdir: /raid/db/flink/checkpoints
state.backend.rocksdb.timer-service.factory: HEAP

重启Flink.执行

nc -lk 9999

第一次提交方式不变

./flink run -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar

 继续之前的输入

a,a,a

 此时停掉flink提交的程序,会在hdfs中发现一个很多数字的文件夹

Flink技术整理

在某台集群服务器上,这里只能说是某台,不一定是你提交任务的那台服务器,可以看到rocksdb的本地数据文件

Flink技术整理

rocksdbbackend是先将数据存储到该处,再刷到hdfs中的

再次启动程序

./flink run -s hdfs://172.18.114.236:8020/backend/rocksDB/6277c8adfba91c72baa384a0d23581d9/chk-64 -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar

此时输入

a,a,a

此时我们去观察结果跟之前相同

Flink技术整理

Scala代码

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig

object CheckpointApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment._getExecutionEnvironment _ env.enableCheckpointing(5000) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend(new RocksDBStateBackend("hdfs://172.18.114.236:8020/backend/rocksDB",true)) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))) val stream = env.socketTextStream("host1",9999) stream.map(x => { if (x.contains("pk")) { throw new RuntimeException("出bug了...") } else { x } }).flatMap(_.split(",")) .map((_,1)) .keyBy(0) .sum(1) .print().setParallelism(1) env.execute("CheckpointApp") } }

监控与调优

HistoryServer

HistoryServer是用来查看已经运行过的Job的信息

在flink的conf目录下编辑flink-conf.yaml,添加一下内容

jobmanager.archive.fs.dir: hdfs://172.18.114.236:8020/completed-jobs/
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://172.18.114.236:8020/completed-jobs/
historyserver.archive.fs.refresh-interval: 10000

在bin目录下运行

./historyserver.sh start

在浏览器中访问 外网ip:8082,可以看到一个Web界面(刚进来的时候这里是没有内容的,我这里是运行了一个Job以后留下的)

Flink技术整理

照例,我们运行一个任务,结束后,可以看到以下的信息

Flink技术整理

在hdfs中也可以看到任务保留下来的信息

Flink技术整理

信息有提供REST API接口可以用Json格式进行访问,例如

Flink技术整理

Flink技术整理

Flink技术整理

等等,具体可以查阅官网。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
3年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这