Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较

Stella981
• 阅读 829

接上篇:Flink FileSink 自定义输出路径——BucketingSink 

上篇使用BucketingSink 实现了自定义输出路径,现在来看看 StreamingFileSink( 据说是StreamingFileSink 是社区优化后添加的connector,推荐使用)

StreamingFileSink 实现起来会稍微麻烦一点(也是灵活,功能更强大),因为可以自己实现序列化方法(源码里面有实例可以参考-复制)

StreamingFileSink 有两个方法可以输出到文件  forRowFormat 和  forBulkFormat,名字差不多代表的方法的含义:行编码格式和块编码格式

forRowFormat 比较简单,只提供了 SimpleStringEncoder 写文本文件,可以指定编码,如下:

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink

val input: DataStream[String] = ...

val sink: StreamingFileSink[String] = StreamingFileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) // 所有数据都写到同一个路径
    .build()
    
input.addSink(sink)

当然我们的主题还是根据输入数据自定义文件输出路径,就需要重写 DayBucketAssigner,如下:

import java.io.IOException
import java.nio.charset.StandardCharsets
import org.apache.flink.core.io.SimpleVersionedSerializer
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner

class DayBucketAssigner extends BucketAssigner[ObjectNode, String] {

  /**
    * bucketId is the output path
    * @param element
    * @param context
    * @return
    */
  override def getBucketId(element: ObjectNode, context: BucketAssigner.Context): String = {
    //context.currentProcessingTime()
    val day = element.get("date").asText("19790101000000").substring(0, 8)
    // wrap can use day + "/" + xxx
    day
  }

  override def getSerializer: SimpleVersionedSerializer[String] = {

    StringSerializer
  }

  /**
    * 实现参考 : org.apache.flink.runtime.checkpoint.StringSerializer
    */
  object StringSerializer extends SimpleVersionedSerializer[String] {
    val VERSION = 77

    override def getVersion = 77

    @throws[IOException]
    override def serialize(checkpointData: String): Array[Byte] = checkpointData.getBytes(StandardCharsets.UTF_8)

    @throws[IOException]
    override def deserialize(version: Int, serialized: Array[Byte]): String = if (version != 77) throw new IOException("version mismatch")
    else new String(serialized, StandardCharsets.UTF_8)
  }
}

在初始化sink 的时候,指定 BucketAssigner 就可以了

val sinkRow = StreamingFileSink
      .forRowFormat(new Path("D:\\idea_out\\rollfilesink"), new SimpleStringEncoder[ObjectNode]("UTF-8"))
      .withBucketAssigner(new DayBucketAssigner)
     // .withBucketCheckInterval(60 * 60 * 1000l) // 1 hour
      .build()

执行结果如下:

Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较 Flink FileSink 自定义输出路径——StreamingFileSink、BucketingSink 和 StreamingFileSink简单比较

2、 forBulkFormat 和forRowFormat 不太一样,需要自己实现 BulkWriterFactory 和  DayBulkWriter,自定义程度高,可以实现自己的  FSDataOutputStream,写出各种格式的文件(forRowFormat 自定义Encoder  也可以,但是如 forBuckFormat 灵活)

// use define BulkWriterFactory and DayBucketAssinger
    val sinkBuck = StreamingFileSink
      .forBulkFormat(new Path("D:\\idea_out\\rollfilesink"), new DayBulkWriterFactory)
      .withBucketAssigner(new DayBucketAssigner())
      .build()

实现如下:

import java.io.File
import java.nio.charset.StandardCharsets
import org.apache.flink.api.common.serialization.BulkWriter
import org.apache.flink.core.fs.FSDataOutputStream
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.util.Preconditions

/**
  * 实现参考 : org.apache.flink.streaming.api.functions.sink.filesystem.BulkWriterTest
  */
class DayBulkWriter extends BulkWriter[ObjectNode] {

  val charset = StandardCharsets.UTF_8
  var stream: FSDataOutputStream = _

  def DayBulkWriter(inputStream: FSDataOutputStream): DayBulkWriter = {
    stream = Preconditions.checkNotNull(inputStream);
    this
  }

  /**
    * write element
    *
    * @param element
    */
  override def addElement(element: ObjectNode): Unit = {
    this.stream.write(element.toString.getBytes(charset))
    // wrapthis.stream.write('\n')
  }

  override def flush(): Unit = {
    this.stream.flush()
  }

  /**
    * output stream is input parameter, just flush, close is factory's job
    */
  override def finish(): Unit = {
    this.flush()
  }

}

/**
  * 实现参考 : org.apache.flink.streaming.api.functions.sink.filesystem.BulkWriterTest.TestBulkWriterFactory
  */
class DayBulkWriterFactory extends BulkWriter.Factory[ObjectNode] {
  override def create(out: FSDataOutputStream): BulkWriter[ObjectNode] = {
    val dayBulkWriter = new DayBulkWriter
    dayBulkWriter.DayBulkWriter(out)

  }
}

执行的结果就不赘述了

又遇到个问题,StreamFileSink 没办法指定输出文件的名字。

BucketingSink 和 StreamingFileSink 的不同

从源码位置来说:

BucketingSink 在 connector 下面,注重输出数据
StreamingFileSink 在api 下面,注重与三方交互

从版本来说:

BucketingSink 比较早就有了
StreamingFileSink 是1.6版本推出的功能(据说是优化后推出的)

从支持的文件系统来说:

BucketingSink     支持Hadoop 文件系统支持的所有文件系统(原文:This connector provides a Sink that writes partitioned files to any filesystem supported by Hadoop FileSystem)
StreamingFileSink 支持Flink FileSystem 抽象文件系统   (原文:This connector provides a Sink that writes partitioned files to filesystems supported by the Flink FileSystem abstraction)

从写数据的方式来说:

BucketingSink     默认的Writer是StringWriter,也提供SequenceFileWriter(字符)
StreamingFileSink 使用 OutputStream +  Encoder 对外写数据 (字节)

从文件滚动策略来说:

BucketingSink     提供了时间、条数滚动 
StreamingFileSink 默认提供时间(官网有说条数,没看到 This is also configurable but the default policy rolls files based on file size and a timeout,自己实现BulkWriter可以)

从目前(1.7.2)来说,BucketingSink 更开箱即用(功能相对简单),StreamingFileSink更麻烦(更灵活、强大)

只是个初学者,还不太能理解 BucketingSink 和 StreamingFileSink 的差异,等了解之后,再来完善

结论:比较推荐使用StreamingFileSink

理由:功能强大,数据刷新时间更快(没有,BucketingSink默认60S的问题,详情见上篇,最后一段)

点赞
收藏
评论区
推荐文章
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Easter79 Easter79
3年前
spring自定义包扫描路径
springboot自定义包扫描路径项目结构(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fblog.csdn.net%2Fqq_26264237%2Farticle%2Fdetails%2F108213759%23_7)注册包路径扫描类(https:
Stella981 Stella981
3年前
Golang Kernel For Jupyter
上篇回顾:VSCodeandNoteBookforJavaScript(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2Fdotnetcrazy%2Fp%2F9962192.html)正常流程安装Go语言:sudoapti
Stella981 Stella981
3年前
ArcGIS API for JavaScript 4.x 本地部署之Nginx法
上篇ArcGISAPIforJavaScript4.x离线配置之IIS法(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2Fonsummer%2Fp%2F10217802.html)提到,如何用IIS配置ArcGISjsAPI;
Stella981 Stella981
3年前
Docker Compose搭建mycat读写分离
说明接\上篇\DockerCompose搭建mysql主从复制(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fm.toutiao.com%2Fi6402735880150188545%2F%3Fgroup_id%3D6402736780172574978%26group_
Stella981 Stella981
3年前
Android自定义控件系列
Android自定义控件系列–Path综述项目源码点击查看详情(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fgithub.com%2Fzhaolongs%2FCommonViewApplication)Path中文释义为路径然而它在
Wesley13 Wesley13
3年前
1. 容器化部署一套云服务 第一讲 Jenkins(Docker + Jenkins + Yii2 + 云服务器))
容器化部署一套云服务系列1\.容器化部署一套云服务之Jenkins(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2Fjackson0714%2Fp%2Fdeploy1.html)一、购买服务器服务器!caeef00
Wesley13 Wesley13
3年前
.NET中使用Redis总结——2.项目实战
接上篇.NET中使用Redis总结——1.Redis搭建(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fwww.cnblogs.com%2Fbigbrid%2Fp%2F6137515.html)看一些Redis相关资料,.NET方面ServiceStack.Redis
Stella981 Stella981
3年前
Excel公式 提取文件路径后缀
原文:Excel公式提取文件路径后缀(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2Fkybs0%2Fp%2F6062889.html)我们在代码中获取一个文件路径的后缀,是一个很简单的事.如C中,可以通过newFileInfo(fil
Stella981 Stella981
3年前
Flink 异步IO访问外部数据(mysql篇)
  接上篇:【翻译】Flink异步I/O访问外部数据(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2FSpringmoonvenn%2Fp%2F11081201.html)  最近看了大佬的博客,突然想起AsyncI/O方式是Blink