基於特定列對Spark Dataframe進行分區,並將每個分區的內容轉儲到csv上

[英]Partition a Spark Dataframe based on a specific column and dump the content of each partition on a csv


I'm using spark 1.6.2 Java APIs to load some data in a Dataframe DF1 that looks like:

我正在使用spark 1.6.2 Java API在Dataframe DF1中加載一些數據,如下所示:

Key  Value
A    v1
A    v2
B    v3
A    v4

Now I need to partition DF1 based on a subset of value in column "Key" and dump each partition to a csv file (using spark-csv).

現在我需要根據“Key”列中的值子集對DF1進行分區,並將每個分區轉儲到csv文件(使用spark-csv)。

Desired Output:

A.csv

Key Value
A   v1
A   v2
A   v4

B.csv

Key Value
B   v3

At the moment what I'm doing is building an HashMap (myList) containing the subset of values that i need to filter and then iterate through that filtering a different Key each iteration. With the following code I get what I want but I'm wondering if there is a more efficient way to do that:

目前我正在做的是構建一個HashMap(myList),其中包含我需要過濾的值子集,然后遍歷每次迭代過濾不同的Key。使用以下代碼我得到了我想要的但我想知道是否有更有效的方法來做到這一點:

DF1 = <some operations>.cache();

for (Object filterKey: myList.keySet()) {
  DF2 = DF1.filter((String)myList.get(filterKey));

  DF2.write().format.format("com.databricks.spark.csv")
            .option("header", "true")
      .save("/" + filterKey + ".csv");
}

1 个解决方案

#1


2  

You are almost there, you just need to add the partitionBy, which will partition the files in the way you want.

你幾乎就在那里,你只需要添加partitionBy,它將以你想要的方式對文件進行分區。

DF1
  .filter{case(key, value) => myList.contains(key))
  .write
  .partitionBy("key")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("/my/basepath/")

The files will now be stored under "/my/basepath/key=A/", "/my/basepath/key=B/", etc..

這些文件現在將存儲在“/ my / basepath / key = A /”,“/ my / basepath / key = B /”等中。


注意!

本站翻译的文章,版权归属于本站,未经许可禁止转摘,转摘请注明本文地址:https://www.itdaan.com/blog/2016/11/19/72fb5760920b29cba5430c10d56be128.html



 
粤ICP备14056181号  © 2014-2021 ITdaan.com