Spark DataFrame列的合并与拆分

Stella981
• 阅读 1882

版本说明:Spark-2.3.0

使用Spark SQL在对数据进行处理的过程中,可能会遇到对一列数据拆分为多列,或者把多列数据合并为一列。这里记录一下目前想到的对DataFrame列数据进行合并和拆分的几种方法。

1 DataFrame列数据的合并
例如:我们有如下数据,想要将三列数据合并为一列,并以“,”分割

+----+---+-----------+
|name|age|      phone|
+----+---+-----------+
|Ming| 20|15552211521|
|hong| 19|13287994007|
| zhi| 21|15552211523|
+----+---+-----------+

1.1 使用map方法重写

使用map方法重写就是将DataFrame使用map取值之后,然后使用toSeq方法转成Seq格式,最后使用Seq的foldLeft方法拼接数据,并返回,如下所示:

//方法1:利用map重写
    val separator = ","
    df.map(_.toSeq.foldLeft("")(_ + separator + _).substring(1)).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */

1.2 使用内置函数concat_ws

合并多列数据也可以使用SparkSQL的内置函数concat_ws()

//方法2: 使用内置函数 concat_ws
    import org.apache.spark.sql.functions._
    df.select(concat_ws(separator, $"name", $"age", $"phone").cast(StringType).as("value")).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */

1.3 使用自定义UDF函数

自己编写UDF函数,实现多列合并

 //方法3:使用自定义UDF函数

    // 编写udf函数
    def mergeCols(row: Row): String = {
      row.toSeq.foldLeft("")(_ + separator + _).substring(1)
    }

    val mergeColsUDF = udf(mergeCols _)
    df.select(mergeColsUDF(struct($"name", $"age", $"phone")).as("value")).show()

完整代码:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StringType

/**
  * Created by shirukai on 2018/9/12
  * DataFrame 合并列
  */
object MergeColsTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local")
      .getOrCreate()

    //从内存创建一组DataFrame数据
    import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L))
      .toDF("name", "age", "phone")
    df.show()
    /**
      * +----+---+-----------+
      * |name|age|      phone|
      * +----+---+-----------+
      * |Ming| 20|15552211521|
      * |hong| 19|13287994007|
      * | zhi| 21|15552211523|
      * +----+---+-----------+
      */
    //方法1:利用map重写
    val separator = ","
    df.map(_.toSeq.foldLeft("")(_ + separator + _).substring(1)).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */
    //方法2: 使用内置函数 concat_ws
    import org.apache.spark.sql.functions._
    df.select(concat_ws(separator, $"name", $"age", $"phone").cast(StringType).as("value")).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */
    //方法3:使用自定义UDF函数

    // 编写udf函数
    def mergeCols(row: Row): String = {
      row.toSeq.foldLeft("")(_ + separator + _).substring(1)
    }

    val mergeColsUDF = udf(mergeCols _)
    df.select(mergeColsUDF(struct($"name", $"age", $"phone")).as("value")).show()

    /**
      * /**
      * * +-------------------+
      * * |              value|
      * * +-------------------+
      * * |Ming,20,15552211521|
      * * |hong,19,13287994007|
      * * | zhi,21,15552211523|
      * * +-------------------+
      **/
      */
  }
}

2 DataFrame列数据的拆分

上面我们将DataFrame的多列数据合并为一列如下所示,有时候我们也需要将单列数据,以某种拆分规则,拆分为多列。下面提供几种将一列拆分为多列的方法。

+-------------------+
|              value|
+-------------------+
|Ming,20,15552211521|
|hong,19,13287994007|
| zhi,21,15552211523|
+-------------------+

2.1 使用内置函数split,然后遍历添加列

该方法,先利用内置函数split将单列的数据拆分,然后遍历使用getItem(角标)方法获取拆分后的数据,依次使用withColumn方法添加新列,代码如下所示:

  //方法1: 使用内置函数split,然后遍历添加列
    val separator = ","
    lazy val first = df.first()

    val numAttrs = first.toString().split(separator).length
    val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
    //按指定分隔符拆分value列,生成splitCols列
    var newDF = df.withColumn("splitCols", split($"value", separator))
    attrs.zipWithIndex.foreach(x => {
      newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
    })
    newDF.show()
  /**
      * +-------------------+--------------------+-----+-----+-----------+
      * |              value|           splitCols|col_0|col_1|      col_2|
      * +-------------------+--------------------+-----+-----+-----------+
      * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
      * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
      * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
      * +-------------------+--------------------+-----+-----+-----------+

2.2 使用UDF函数创建多列数据,然后合并
该方法是使用udf函数,生成多个列,然后合并到原来的数据。该方法参考了VectorDisassembler(与spark ml官网提供的VectorAssembler相反),这是一个第三方的spark ml向量拆分算法,该方法github地址:https://github.com/jamesbconner/VectorDisassembler。代码如下所示:

//方法2:使用udf函数创建多列,然后合并
    val attributes: Array[Attribute] = {
      val numAttrs = first.toString().split(separator).length
      //生成attributes
      Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName("value" + "_" + i))
    }
    //创建多列数据
    val fieldCols = attributes.zipWithIndex.map(x => {
      val assembleFunc = udf {
        str: String =>
          str.split(separator)(x._2)
      }
      assembleFunc(df("value").cast(StringType)).as(x._1.name.get, x._1.toMetadata())
    })
    //合并数据
    df.select(col("*") +: fieldCols: _*).show()

    /**
      * +-------------------+-------+-------+-----------+
      * |              value|value_0|value_1|    value_2|
      * +-------------------+-------+-------+-----------+
      * |Ming,20,15552211521|   Ming|     20|15552211521|
      * |hong,19,13287994007|   hong|     19|13287994007|
      * | zhi,21,15552211523|    zhi|     21|15552211523|
      * +-------------------+-------+-------+-----------+
      */

完整代码:

import org.apache.spark.ml.attribute.{Attribute, NumericAttribute}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType

/**
  * Created by shirukai on 2018/9/12
  * 拆分列
  */
object SplitColTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local")
      .getOrCreate()

    //从内存中创建DataFrame
    import spark.implicits._
    val df = Seq("Ming,20,15552211521", "hong,19,13287994007", "zhi,21,15552211523")
      .toDF("value")
    df.show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */

    import org.apache.spark.sql.functions._
    //方法1: 使用内置函数split,然后遍历添加列
    val separator = ","
    lazy val first = df.first()

    val numAttrs = first.toString().split(separator).length
    val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
    //按指定分隔符拆分value列,生成splitCols列
    var newDF = df.withColumn("splitCols", split($"value", separator))
    attrs.zipWithIndex.foreach(x => {
      newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
    })
    newDF.show()

    /**
      * +-------------------+--------------------+-----+-----+-----------+
      * |              value|           splitCols|col_0|col_1|      col_2|
      * +-------------------+--------------------+-----+-----+-----------+
      * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
      * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
      * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
      * +-------------------+--------------------+-----+-----+-----------+
      */

    //方法2:使用udf函数创建多列,然后合并
    val attributes: Array[Attribute] = {
      val numAttrs = first.toString().split(separator).length
      //生成attributes
      Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName("value" + "_" + i))
    }
    //创建多列数据
    val fieldCols = attributes.zipWithIndex.map(x => {
      val assembleFunc = udf {
        str: String =>
          str.split(separator)(x._2)
      }
      assembleFunc(df("value").cast(StringType)).as(x._1.name.get, x._1.toMetadata())
    })
    //合并数据
    df.select(col("*") +: fieldCols: _*).show()

    /**
      * +-------------------+-------+-------+-----------+
      * |              value|value_0|value_1|    value_2|
      * +-------------------+-------+-------+-----------+
      * |Ming,20,15552211521|   Ming|     20|15552211521|
      * |hong,19,13287994007|   hong|     19|13287994007|
      * | zhi,21,15552211523|    zhi|     21|15552211523|
      * +-------------------+-------+-------+-----------+
      */
  }
}
点赞
收藏
评论区
推荐文章
kenx kenx
3年前
MySQL查询结果集字符串操作之多行合并与单行分割
前言我们在做项目写sql语句的时候,是否会遇到这样的场景,就是需要把查询出来的多列,按照字符串分割合并成一列显示,或者把存在数据库里面用逗号分隔的一列,查询分成多列呢,常见场景有,文章标签,需要吧查询多个标签合并成一列,等,需要怎么去实现呢,这就涉及到MySQL的字符串操作groupconcat场景再现我想把查询多列数据合并成一列显示用逗号分隔
Wesley13 Wesley13
3年前
java架构之路
说一下mysql比较宏观的面试,具体咋写sql的这里就不过多举例了。后面我还会给出一个关于mysql面试优化的试题,这里主要说的索引和BTree结构,很少提到我们的集群配置优化方案。1.索引是什么?有什么作用以及缺点  答:索引是对数据库表中一列或多列的值进行排序的一种结构,使用索引可快速访问数据库表中的特定信息。也可以理解为索引就
Stella981 Stella981
3年前
Excel数据转化为sql脚本
在实际项目开发中,有时会遇到客户让我们把大量Excel数据导入数据库的情况。这时我们就可以通过将Excel数据转化为sql脚本来批量导入数据库。1在数据前插入一列单元格,用来拼写sql语句。 具体写法:"insertintot\_student(id,name,age,class)value("&B2&",'"&C2&"',"&D2&"
Wesley13 Wesley13
3年前
95%的人都不知道 MySQL还有索引管理与执行计划
1.1索引的介绍  索引是对数据库表中一列或多列的值进行排序的一种结构,使用索引可快速访问数据库表中的特定信息。如果想按特定职员的姓来查找他或她,则与在表中搜索所有的行相比,索引有助于更快地获取信息。  索引的一个主要目的就是加快检索表中数据的方法,亦即能协助信息搜索者尽快的找到符合限制条件的记录ID的辅助数据结构。!fi
Stella981 Stella981
3年前
Python之DataFrame更改列名及重排列顺序
日常在处理数据的时候,经常需要对dataframe进行重排,只取其中几列或者更改列名等操作;有两个相似的方法reindex和rename,与此记录一下常见的用法,并标注一下区别:rename:重命名,就是对col列进行命名的修改,他只改变col的名字,相当于起了个别名,原来叫col1,以后叫col2,inplaceTrue,用来保存更改,即更改了原
Wesley13 Wesley13
3年前
mysql——GROUP BY和HAVING
GROUPBY语法可以根据给定数据列的每个成员对查询结果进行分组统计,最终得到一个分组汇总表。select子句中的列名必须为分组列或列函数,列函数对于groupby子句定义的每个组返回一个结果。某个员工信息表结构和数据如下:  id  name  dept  salary  edlevel     hiredate   1  张
Stella981 Stella981
3年前
Codeforces997C Sky Full of Stars 【FMT】【组合数】
题目大意:一个$n\n$的格子,每个格子由你填色,有三种允许填色的方法,问有一行或者一列相同的方案数。题目分析:标题的FMT是我吓人用的。一行或一列的问题不好解决,转成它的反面,没有一行和一列相同的方案数。从一个方向入手,比如列,把一列看成一个整体。把颜色看成二进制数,$001$,$010$,$100$。那么一列构成了一个长度为$3
Wesley13 Wesley13
3年前
MySQL 索引(3)
什么是索引?索引是对数据库表中一列或多列的值进行排序的一种结构,使用索引可快速访问数据库表中的特定信息。比如想从字典中查询某一个字,我们可以通过偏旁、或者拼音来快速定位到要找的页码,这种方式也可以被理解为一种索引。Mysql常用的索引类型类型说明Normal(普通)普通索引,没任何限制。Unique(唯
Wesley13 Wesley13
3年前
mysql——索引——概念
一、索引索引由数据库表中一列或者多列组合而成,其作用是提高对表中数据的查询速度。索引是创建在表上面的,是对数据表中一列或者多列的值进行排序的一种结构。通过索引,查询数据时可以不必读完记录的所有信息,而只是查询索引列。索引优点:提高检
Wesley13 Wesley13
3年前
Java面试通关要点汇总集之核心篇参考答案
核心篇数据存储MySQL索引使用的注意事项1.索引不会包含有NULL的列只要列中包含有NULL值,都将不会被包含在索引中,复合索引中只要有一列含有NULL值,那么这一列对于此符合索引就是无效的。2.使用短索引