Ignite 与 Spark 都很强,那如果把它们整合起来会怎样?

Stella981
• 阅读 816

前面的文章中,我们分别介绍了 Ignite 和 Spark 这两种技术,从功能上对两者进行了全面深入的对比。经过分析,可以得出这样一个结论:两者都很强大,但是差别很大,定位不同,因此会有不同的适用领域。

但是,这两种技术也是可以互补的,那么它们互补适用于场景是什么呢?主要是这么几个方面:如果觉得 Spark 中的 SQL 等运行速度较慢,那么 Ignite 通过自己的方式提供了对 Spark 应用进行进一步加速的解决方案,这方面可选的解决方案并不多,推荐开发者考虑,另外就是数据和状态的共享,当然这方面的解决方案也有很多,并不是一定要用 Ignite 实现。

Ignite 原生提供了对 Spark 的支持,本文主要探讨为何如何将 Ignite 和 Spark 进行集成。

1.将 Ignite 与 Spark 整合

整合这两种技术会为 Spark 应用带来若干明显的好处:

  • 通过避免大量的数据移动,获得真正可扩展的内存级性能;
  • 提高 RDD、DataFrame 和 SQL 的性能;
  • 在 Spark 作业之间更方便地共享状态和数据。

下图显示了如何整合这两种技术,并且标注了显著的优势: 

Ignite 与 Spark 都很强,那如果把它们整合起来会怎样?

通过该图,可以从整体架构的角度看到 Ignite 在整个 Spark 应用中的位置和作用。

Ignite 对 Spark 的支持主要体现为两个方面,一个是 Ignite RDD,一个是 Ignite DataFrame。本文会首先聚焦于 Ignite RDD,之后再讲讲 Ignite DataFrame。

2.Ignite RDD

Ignite 提供了一个SparkRDD的实现,叫做IgniteRDD,这个实现可以在内存中跨 Spark 作业共享任何数据和状态,IgniteRDD为 Ignite 中相同的内存数据提供了一个共享的、可变的视图,它可以跨多个不同的 Spark 作业、工作节点或者应用,相反,原生的 SparkRDD 无法在 Spark 作业或者应用之间进行共享。

IgniteRDD作为 Ignite 分布式缓存的视图,既可以在 Spark 作业执行进程中部署,也可以在 Spark 工作节点中部署,也可以在它自己的集群中部署。因此,根据预配置的部署模型,状态共享既可以只存在于一个 Spark 应用的生命周期内部(嵌入式模式),也可以存在于 Spark 应用的外部(独立模式)。

Ignite 还可以帮助 Spark 应用提高 SQL 的性能,虽然 SparkSQL 支持丰富的 SQL 语法,但是它没有实现索引。从结果上来说,即使在普通较小的数据集上,Spark 查询也可能花费几分钟的时间,因为需要进行全表扫描。如果使用 Ignite,Spark 用户可以配置主索引和二级索引,这样可以带来上千倍的性能提升。

2.1.IgniteRDD 示例

下面通过一些代码以及创建若干应用的方式,展示 IgniteRDD 带来的好处。

可以使用多种语言来访问 Ignite RDD,这对于有跨语言需求的团队来说有友好的,下边代码共包括两个简单的 Scala 应用和两个 Java 应用。此外,会从两个不同的环境运行应用:从终端运行 Scala 应用以及通过 IDE 运行 Java 应用。另外还会在 Java 应用中运行一些 SQL 查询。

对于 Scala 应用,一个应用会用于往 IgniteRDD 中写入数据,而另一个应用会执行部分过滤然后返回结果集。使用 Maven 将代码构建为一个 jar 文件后在终端窗口中执行这个程序,下面是详细的代码:

object RDDWriter extends App {

    val conf = new SparkConf().setAppName("RDDWriter")

    val sc = new SparkContext(conf)

    val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")

    val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")

    sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i)))

    ic.close(true)

    sc.stop()

}

object RDDReader extends App {

    val conf = new SparkConf().setAppName("RDDReader")

    val sc = new SparkContext(conf)

    val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")

    val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")

    val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500)

    println("The count is " + greaterThanFiveHundred.count())

    ic.close(true)

    sc.stop()

}

在这个 Scala 的RDDWriter中,首先创建了包含应用名的SparkConf,之后基于这个配置创建了SparkContext,最后,根据这个SparkContext创建一个IgniteContext。创建IgniteContext有很多种方法,本例中使用一个叫做example-shared-rdd.xml的 XML 文件,该文件会结合 Ignite 发行版然后根据需求进行预配置。显然,需要根据自己的环境修改路径(Ignite 主目录),之后指定 IgniteRDD 持有的整数值元组,最后,将从 1 到 1000 的整数值存入 IgniteRDD,数值的存储使用了 10 个 parallel 操作。

在这个 Scala 的RDDReader中,初始化和配置与 Scala RDDWriter相同,也会使用同一个 XML 配置文件,应用会执行部分过滤,然后关注存储了多少大于 500 的值,答案最后会输出。

关于IgniteContextIgniteRDD的更多信息,可以看 Ignite 的文档

要构建 jar 文件,可以使用下面的 maven 命令:

mvn clean install

接下来,看下 Java 代码,先写一个 Java 应用往IgniteRDD中写入多个记录,然后另一个应用会执行部分过滤然后返回结果集,下面是RDDWriter的代码细节:

public class RDDWriter {

    public static void main(String args[]) {

        SparkConf sparkConf = new SparkConf().setAppName("RDDWriter").setMaster("local").set("spark.executor.instances", "2");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        Logger.getRootLogger().setLevel(Level.OFF);

        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(

            sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);

        JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

        List data = new ArrayList<>(20);

        for (int i = 1001; i <= 1020; i++) {

            data.add(i);

        }

        JavaRDD javaRDD = sparkContext.parallelize(data);

        sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {

            public Tuple2<Integer, Integer> call(Integer val) throws Exception {

                return new Tuple2<Integer, Integer>(val, val);

            }

        }));

        igniteContext.close(true);

        sparkContext.close();

    }

}

在这个 Java 的RDDWriter中,首先创建了包含应用名和执行器数量的SparkConf,之后基于这个配置创建了SparkContext,最后,根据这个SparkContext创建一个IgniteContext。最后,往 IgniteRDD 中添加了额外的 20 个值。

在这个 Java 的RDDReader中,初始化和配置与 Java RDDWriter相同,也会使用同一个 XML 配置文件,应用会执行部分过滤,然后关注存储了多少大于 500 的值,答案最后会输出,下面是 Java RDDReader的代码:

public class RDDReader {

    public static void main(String args[]) {

        SparkConf sparkConf = new SparkConf().setAppName("RDDReader").setMaster("local").set("spark.executor.instances", "2");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        Logger.getRootLogger().setLevel(Level.OFF);

        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(

            sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);

        JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

        JavaPairRDD<Integer, Integer> greaterThanFiveHundred = sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {

            public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception {

                return tuple._2() > 500;

            }

        });

        System.out.println("The count is " + greaterThanFiveHundred.count());

        System.out.println(">>> Executing SQL query over Ignite Shared RDD...");

        Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10");

        df.show();

        igniteContext.close(true);

        sparkContext.close();

    }

}

到这里就可以对代码进行测试了。

2.2.运行应用

在第一个终端窗口中,启动 Spark 的主节点,如下:

$SPARK_HOME/sbin/start-master.sh

在第二个终端窗口中,启动 Spark 工作节点,如下:

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port

根据自己的环境,修改 IP 地址和端口号(ip:port)。

在第三个终端窗口中,启动一个 Ignite 节点,如下:

$IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml

这里使用了之前讨论过的example-shared-rdd.xml文件。

在第四个终端窗口中,可以运行 Scala 版的 RDDWriter 应用,如下:

$SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"

根据自己的环境修改 IP 地址和端口(ip:port),以及 jar 文件的路径(/path_to_jar_file)。

会产生如下的输出:

The count is 500

这是期望的输出。

接下来,杀掉 Spark 的主节点和工作节点,而 Ignite 节点仍然在运行中并且IgniteRDD对于其它应用仍然可用,下面会使用 IDE 通过 Java 应用接入IgniteRDD

运行 Java 版RDDWriter会扩展之前存储于 IgniteRDD 中的记录列表,通过运行 Java 版RDDReader可以进行测试,它会产生如下的输出:

The count is 520

这也是期望的输出。

最后,SQL 查询会在IgniteRDD中执行一个 SELECT 语句,返回范围在 10 到 100 之间的最初 10 个值,输出如下:

Ignite 与 Spark 都很强,那如果把它们整合起来会怎样?

结果正确。

3.IgniteDataframes

Spark 的 DataFrame API 为描述数据引入了模式的概念,Spark 通过表格的形式进行模式的管理和数据的组织。

DataFrame 是一个组织为命名列形式的分布式数据集,从概念上讲,DataFrame 等同于关系数据库中的表,并允许 Spark 使用 Catalyst 查询优化器来生成高效的查询执行计划。而 RDD 只是跨集群节点分区化的元素集合。

Ignite 扩展了 DataFrames,简化了开发,改进了将 Ignite 作为 Spark 的内存存储时的数据访问时间,好处包括:

  • 通过 Ignite 读写 DataFrames 时,可以在 Spark 作业之间共享数据和状态;
  • 通过优化 Spark 的查询执行计划加快 SparkSQL 查询,这些主要是通过 IgniteSQL 引擎的高级索引以及避免了 Ignite 和 Spark 之间的网络数据移动实现的。

3.1.IgniteDataframes 示例

下面通过一些代码以及搭建几个小程序的方式,了解如何通过 Ignite DataFrames 整合 Ignite 与 Spark。

一共会写两个 Java 的小应用,然后在 IDE 中运行,还会在这些 Java 应用中执行一些 SQL 查询。

一个 Java 应用会从 JSON 文件中读取一些数据,然后创建一个存储于 Ignite 的 DataFrame,这个 JSON 文件 Ignite 的发行版中已经提供,另一个 Java 应用会从 Ignite 的 DataFrame 中读取数据然后使用 SQL 进行查询。

下面是写应用的代码:

public class DFWriter {

    private static final String CONFIG = "config/example-ignite.xml";

    public static void main(String args[]) {

        Ignite ignite = Ignition.start(CONFIG);

        SparkSession spark = SparkSession.builder().appName("DFWriter").master("local").config("spark.executor.instances", "2").getOrCreate();

        Logger.getRootLogger().setLevel(Level.OFF);

        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        Dataset peopleDF = spark.read().json(

            resolveIgnitePath("resources/people.json").getAbsolutePath());

        System.out.println("JSON file contents:");

        peopleDF.show();

        System.out.println("Writing DataFrame to Ignite.");

        peopleDF.write().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id").option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated").save();

        System.out.println("Done!");

        Ignition.stop(false);

    }

}

DFWriter中,首先创建了SparkSession,它包含了应用名,之后会使用spark.read().json()读取 JSON 文件并且输出文件内容,下一步是将数据写入 Ignite 存储。下面是DFReader的代码:

public class DFReader {

    private static final String CONFIG = "config/example-ignite.xml";

    public static void main(String args[]) {

        Ignite ignite = Ignition.start(CONFIG);

        SparkSession spark = SparkSession.builder().appName("DFReader").master("local").config("spark.executor.instances", "2").getOrCreate();

        Logger.getRootLogger().setLevel(Level.OFF);

        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        System.out.println("Reading data from Ignite table.");

        Dataset peopleDF = spark.read().format(IgniteDataFrameSettings.FORMAT_IGNITE()).option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG).option(IgniteDataFrameSettings.OPTION_TABLE(), "people").load();

        peopleDF.createOrReplaceTempView("people");

        Dataset sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6");

        sqlDF.show();

        System.out.println("Done!");

        Ignition.stop(false);

    }

}

DFReader中,初始化和配置与DFWriter相同,这个应用会执行一些过滤,需求是查找所有的 id > 0 以及 < 6 的人,然后输出结果。

在 IDE 中,通过下面的代码可以启动一个 Ignite 节点:

public class ExampleNodeStartup {

    public static void main(String[] args) throws IgniteException {

        Ignition.start("config/example-ignite.xml");

    }

}

到此,就可以对代码进行测试了。

3.2.运行应用

首先在 IDE 中启动一个 Ignite 节点,然后运行DFWriter应用,输出如下:

Ignite 与 Spark 都很强,那如果把它们整合起来会怎样?

如果将上面的结果与 JSON 文件的内容进行对比,会显示两者是一致的,这也是期望的结果。

下一步运行DFReader,输出如下:

Ignite 与 Spark 都很强,那如果把它们整合起来会怎样?

这也是期望的输出。

4.总结

通过本文,会发现 Ignite 与 Spark 的集成很简单,也可以看到如何从多个环境中使用多个编程语言轻松地访问IgniteRDD。可以对IgniteRDD进行数据的读写,并且即使 Spark 已经关闭状态也能通过 Ignite 得以保持,也看到了通过 Ignite 进行 DataFrame 的读写。读者可以轻松尝试一下。

如果想要这些示例的源代码,可以从这里下载。

作者

李玉珏,架构师,有丰富的架构设计和技术研发团队管理经验,社区技术翻译作者以及撰稿人,开源技术贡献者。Apache Ignite 技术中文文档翻译作者,长期在国内进行 Ignite 技术的推广/技术支持/咨询工作。

本文系作者投稿文章。欢迎投稿。

投稿内容要求

  • 互联网技术相关,包括但不限于开发语言、网络、数据库、架构、运维、前端、DevOps(DevXXX)、AI、区块链、存储、移动、安全、技术团队管理等内容。
  • 文章不需要首发,可以是已经在开源中国博客或网上其它平台发布过的。但是鼓励首发,首发内容被收录可能性较大。
  • 如果你是记录某一次解决了某一个问题(这在博客中占绝大比例),那么需要将问题的前因后果描述清楚,最直接的就是结合图文等方式将问题复现,同时完整地说明解决思路与最终成功的方案。
  • 如果你是分析某一技术理论知识,请从定义、应用场景、实际案例、关键技术细节、观点等方面,对其进行较为全面地介绍。
  • 如果你是以实际案例分享自己或者公司对诸如某一架构模型、通用技术、编程语言、运维工具的实践,那么请将事件相关背景、具体技术细节、演进过程、思考、应用效果等方面描述清楚
  • 其它未尽 case 具体情况具体分析,不虚的,文章投过来试试先,比如我们并不拒绝就某个热点事件对其进行的报导、深入解析。

投稿方式

重要说明

  • 作者需要拥有所投文章的所有权,不能将别人的文章拿过来投递。
  • 投递的文章需要经过审核,如果开源中国编辑觉得需要的话,将与作者一起进一步完善文章,意在使文章更佳、传播更广。
  • 文章版权归作者所有,开源中国获得文章的传播权,可在开源中国各个平台进行文章传播,同时保留文章原始出处和作者信息,可在官方博客中标原创标签。
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这