您好,欢迎来到微智科技网。
搜索
您的当前位置:首页SparkSQLdropDuplicates

SparkSQLdropDuplicates

来源:微智科技网
SparkSQLdropDuplicates

spark sql 数据去重

在对spark sql 中的dataframe数据表去除重复数据的时候可以使⽤dropDuplicates()⽅法

dropDuplicates()有4个重载⽅法

第⼀个def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)

这个⽅法,不需要传⼊任何的参数,默认根据所有列进⾏去重,然后按数据⾏的顺序保留每⾏数据出现的第⼀条。/**

* Returns a new Dataset that contains only the unique rows from this Dataset. * This is an alias for `distinct`. *

* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it

* will keep all data across triggers as intermediate state to drop duplicates rows. You can use * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit * the state. In addition, too late data older than watermark will be dropped to avoid any * possibility of duplicates. *

* @group typedrel * @since 2.0.0 */

def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)

第⼆个def dropDuplicates(colNames: Seq[String])

传⼊的参数是⼀个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进⾏去重,然后也是返回每⼀⾏数据出现的第⼀条/**

* (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only * the subset of columns. *

* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it

* will keep all data across triggers as intermediate state to drop duplicates rows. You can use * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit * the state. In addition, too late data older than watermark will be dropped to avoid any * possibility of duplicates. *

* @group typedrel * @since 2.0.0 */

def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { val resolver = sparkSession.sessionState.analyzer.resolver val allColumns = queryExecution.analyzed.output

val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) => // It is possibly there are more than one columns with the same name, // so we call filter instead of find.

val cols = allColumns.filter(col => resolver(col.name, colName)) if (cols.isEmpty) {

throw new AnalysisException(

s\"\"\"Cannot resolve column name \"$colName\" among (${schema.fieldNames.mkString(\ } cols }

Deduplicate(groupCols, planWithBarrier) }

第三个def dropDuplicates(colNames: Array[String])

传⼊的参数是⼀个数组,然后⽅法会把数组转换为序列然后再调⽤第⼆个⽅法。

/**

* Returns a new Dataset with duplicate rows removed, considering only * the subset of columns. *

* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it

* will keep all data across triggers as intermediate state to drop duplicates rows. You can use * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit * the state. In addition, too late data older than watermark will be dropped to avoid any * possibility of duplicates. *

* @group typedrel * @since 2.0.0 */

def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)

第四个def dropDuplicates(col1: String, cols: String*)

传⼊的参数为字符串,在⽅法体内会把你传⼊的字符串组合成⼀个序列再调⽤第⼆个⽅法。/**

* Returns a new [[Dataset]] with duplicate rows removed, considering only * the subset of columns. *

* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it

* will keep all data across triggers as intermediate state to drop duplicates rows. You can use * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit * the state. In addition, too late data older than watermark will be dropped to avoid any

* possibility of duplicates. *

* @group typedrel * @since 2.0.0 */

@scala.annotation.varargs

def dropDuplicates(col1: String, cols: String*): Dataset[T] = { val colNames: Seq[String] = col1 +: cols dropDuplicates(colNames) }

第三和第四个本质上还是调⽤了第⼆个⽅法,所以我们在使⽤的时候如果需要根据指定的列进⾏数据去重,可以直接传⼊⼀个Seq。第⼀个⽅法默认根据所有列去重,实际上也是调⽤了第⼆个⽅法,然后传⼊参数this.columns,即所有的列组成的Seq。

所以各位想深究dropDuplicate()去重的核⼼代码,只需要研究第⼆个去重⽅法即可。等我有时间我也会把去重的核⼼源码讲解继续补充。

dropDuplicates()的坑!

在使⽤dropDuplicates() 在去重的时候,我发现有时候还是会出现重复数据的情况。我分析了⼀下还出现重复数据的原因:数据存在多个excuter中

因为spark是分布式计算的,数据在计算的时候会分布在不同的excutor上,使⽤dropDuplicate去重的时候,可能只是⼀个excutor内的数据进⾏了去重,别的excutor上可能还会有重复的数据。

数据是存放在不同分区的,

因为spark是分布式计算的,数据在计算的时候会分散在不同的分区中,使⽤dropDuplicate去重的时候,不同的区分可能还会存在相同的数据。

我试了只启动⼀个excutor多分区的情况下进⾏计算,没有出现重复的数据,然后多个excutor将数据先合并到⼀个分区在去重还是有重复的数据。所以觉得可能是第⼀种猜测的情况⽐较⼤,但是如果只使⽤⼀个excutor就失去了分布式计算的意义和优势,所以还是得想想其它办法。各位有什么好的解决办法也可以在评论区交流!

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- 7swz.com 版权所有 赣ICP备2024042798号-8

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务