Write to multiple outputs by key Spark - one Spark job

ScalaHadoopOutputHdfsApache Spark

Scala Problem Overview


How can you write to multiple outputs dependent on the key using Spark in a single Job.

Related: https://stackoverflow.com/questions/23994383/write-to-multiple-outputs-by-key-scalding-hadoop-one-mapreduce-job/

E.g.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

would ensure cat prefix/1 is

a
b

and cat prefix/2 would be

c

EDIT: I've recently added a new answer that includes full imports, pimp and compression codec, see https://stackoverflow.com/a/46118044/1586965, which may be helpful in addition to the earlier answers.

Scala Solutions


Solution 1 - Scala

If you use Spark 1.4+, this has become much, much easier thanks to the DataFrame API. (DataFrames were introduced in Spark 1.3, but partitionBy(), which we need, was introduced in 1.4.)

If you're starting out with an RDD, you'll first need to convert it to a DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

In Python, this same code is:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

Once you have a DataFrame, writing to multiple outputs based on a particular key is simple. What's more -- and this is the beauty of the DataFrame API -- the code is pretty much the same across Python, Scala, Java and R:

people_df.write.partitionBy("number").text("people")

And you can easily use other output formats if you want:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

In each of these examples, Spark will create a subdirectory for each of the keys that we've partitioned the DataFrame on:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh

Solution 2 - Scala

I would do it like this which is scalable

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()
  
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Just saw similar answer above, but actually we don't need customized partitions. The MultipleTextOutputFormat will create file for each key. It is ok that multiple record with same keys fall into the same partition.

new HashPartitioner(num), where the num is the partition number you want. In case you have a big number of different keys, you can set number to big. In this case, each partition will not open too many hdfs file handlers.

Solution 3 - Scala

If you potentially have many values for a given key, I think the scalable solution is to write out one file per key per partition. Unfortunately there is no built-in support for this in Spark, but we can whip something up.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
  .mapPartitionsWithIndex { (p, it) =>
    val outputs = new MultiWriter(p.toString)
    for ((k, v) <- it) {
      outputs.write(k.toString, v)
    }
    outputs.close
    Nil.iterator
  }
  .foreach((x: Nothing) => ()) // To trigger the job.

// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
  private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
  def write(key: String, value: Any) = {
    if (!writers.contains(key)) {
      val f = new java.io.File("output/" + key + "/" + suffix)
      f.getParentFile.mkdirs
      writers(key) = new java.io.PrintWriter(f)
    }
    writers(key).println(value)
  }
  def close = writers.values.foreach(_.close)
}

(Replace PrintWriter with your choice of distributed filesystem operation.)

This makes a single pass over the RDD and performs no shuffle. It gives you one directory per key, with a number of files inside each.

Solution 4 - Scala

This includes the codec as requested, necessary imports, and pimp as requested.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

One subtle difference to the OP is that it will prefix <keyName>= to the directory names. E.g.

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

Would give:

prefix/key=1/part-00000
prefix/key=2/part-00000

where prefix/my_number=1/part-00000 would contain the lines a and b, and prefix/my_number=2/part-00000 would contain the line c.

And

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")

Would give:

prefix/foo=1/part-00000
prefix/foo=2/part-00000

It should be clear how to edit for parquet.

Finally below is an example for Dataset, which is perhaps nicer that using Tuples.

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}

Solution 5 - Scala

I have a similar need and found an way. But it has one drawback (which is not a problem for my case): you need to re-partition you data with one partition per output file.

To partition in this way it generally requires to know beforehand how many files the job will output and find a function that will map each key to each partition.

First let's create our MultipleTextOutputFormat-based class:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
  override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
    key.toString
  }
  override protected def generateActualKey(key: T, value: V) = {
    null
  }
}

With this class Spark will get a key from a partition (the first/last, I guess) and name the file with this key, so it's not good to mix multiple keys on the same partition.

For your example, you will require a custom partitioner. This will do the job:

import org.apache.spark.Partitioner

class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
  def numPartitions = maxKey

  def getPartition(key: Any): Int = key match {
    case i: Int if i < maxKey => i
  }
}

Now let's put everything together:

val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))

// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)

val prefix = "hdfs://.../prefix"

val partitionedRDD = rdd.partitionBy(partitioner)

partitionedRDD.saveAsHadoopFile(prefix,
    classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])

This will generate 3 files under prefix (named 1, 2 and 7), processing everything in one pass.

As you can see, you need some knowledge about your keys to be able to use this solution.

For me it was easier because I needed one output file for each key hash and the number of files was under my control, so I could use the stock HashPartitioner to do the trick.

Solution 6 - Scala

I was in need of the same thing in Java. Posting my translation of Zhang Zhan's Scala answer to Spark Java API users:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;


class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
        return key.toString();
    }
}

public class Main {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Split Job")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
        sc.parallelize(Arrays.asList(strings))
                // The first character of the string is the key
                .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
                .saveAsHadoopFile("output/", String.class, String.class,
                        RDDMultipleTextOutputFormat.class);
        sc.stop();
    }
}

Solution 7 - Scala

saveAsText() and saveAsHadoop(...) are implemented based on the RDD data, specifically by the method: PairRDD.saveAsHadoopDataset which takes the data from the PairRdd where it's executed. I see two possible options: If your data is relatively small in size, you could save some implementation time by grouping over the RDD, creating a new RDD from each collection and using that RDD to write the data. Something like this:

val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}

Note that it will not work for large datasets b/c the materialization of the iterator at v.toSeq might not fit in memory.

The other option I see, and actually the one I'd recommend in this case is: roll your own, by directly calling the hadoop/hdfs api.

Here's a discussion I started while researching this question: How to create RDDs from another RDD?

Solution 8 - Scala

I had a similar use case where I split the input file on Hadoop HDFS into multiple files based on a key (1 file per key). Here is my scala code for spark

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

@serializable object processGroup {
    def apply(groupName:String, records:Iterable[String]): Unit = {
        val outFileStream = fs.create(new Path("/output_dir/"+groupName))
        for( line <- records ) {
                outFileStream.writeUTF(line+"\n")
            }
        outFileStream.close()
    }
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))

I have grouped the records based on key. The values for each key is written to separate file.

Solution 9 - Scala

good news for python user in the case you have multi columns and you want to save all the other columns not partitioned in csv format which will failed if you use "text" method as Nick Chammas' suggestion .

people_df.write.partitionBy("number").text("people") 

error message is "AnalysisException: u'Text data source supports only a single column, and you have 2 columns.;'"

In spark 2.0.0 (my test enviroment is hdp's spark 2.0.0) package "com.databricks.spark.csv" is now integrated , and it allow us save text file partitioned by only one column, see the example blow:

people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
                             (1,"2016-12-25", "alice"),
                             (1,"2016-12-25", "tom"), 
                             (1, "2016-12-25","bob"), 
                             (2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])

df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")

[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS

[root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie

In my spark 1.6.1 enviroment ,the code didn't throw any error,however ther is only one file generated. it's not partitioned by two folders.

Hope this can help .

Solution 10 - Scala

I had a similar use case. I resolved it in Java by writing two custom classes implemeting MultipleTextOutputFormat and RecordWriter.

My input was a JavaPairRDD<String, List<String>> and I wanted to store it in a file named by its key, with all the lines contained in its value.

Here is the code for my MultipleTextOutputFormat implementation

class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {

    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString(); //The return will be used as file name
    }

    /** The following 4 functions are only for visibility purposes                 
    (they are used in the class MyRecordWriter) **/
    protected String generateLeafFileName(String name) {
        return super.generateLeafFileName(name);
    }

    protected V generateActualValue(K key, V value) {
        return super.generateActualValue(key, value);
    }

    protected String getInputFileBasedOutputFileName(JobConf job,     String name) {
        return super.getInputFileBasedOutputFileName(job, name);
        }

    protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
        return super.getBaseRecordWriter(fs, job, name, arg3);
    }

    /** Use my custom RecordWriter **/
    @Override
    RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
    final String myName = this.generateLeafFileName(name);
        return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
    }
} 

Here is the code for my RecordWriter implementation.

class MyRecordWriter<K, V> implements RecordWriter<K, V> {

    private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
    private final FileSystem fs;
    private final JobConf job;
    private final Progressable arg3;
    private String myName;

    TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();

    MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
        this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
        this.fs = fs;
        this.job = job;
        this.arg3 = arg3;
        this.myName = myName;
    }

    @Override
    void write(K key, V value) throws IOException {
        String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
        String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
        Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
        RecordWriter rw = this.recordWriters.get(finalPath);
        if(rw == null) {
            rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
            this.recordWriters.put(finalPath, rw);
        }
        List<String> lines = (List<String>) actualValue;
        for (String line : lines) {
            rw.write(null, line);
        }
    }

    @Override
    void close(Reporter reporter) throws IOException {
        Iterator keys = this.recordWriters.keySet().iterator();

        while(keys.hasNext()) {
            RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
            rw.close(reporter);
        }

        this.recordWriters.clear();
    }
}

Most of the code is exactly the same than in FileOutputFormat. The only difference is those few lines

List<String> lines = (List<String>) actualValue;
for (String line : lines) {
    rw.write(null, line);
}

These lines allowed me to write each line of my input List<String> on the file. The first argument of the write function is set to null in order to avoid writting the key on each line.

To finish, I only need to do this call to write my files

javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionsamthebestView Question on Stackoverflow
Solution 1 - ScalaNick ChammasView Answer on Stackoverflow
Solution 2 - Scalazhang zhanView Answer on Stackoverflow
Solution 3 - ScalaDaniel DarabosView Answer on Stackoverflow
Solution 4 - ScalasamthebestView Answer on Stackoverflow
Solution 5 - ScaladouglazView Answer on Stackoverflow
Solution 6 - ScalaThamme GowdaView Answer on Stackoverflow
Solution 7 - ScalamaasgView Answer on Stackoverflow
Solution 8 - ScalashanmugaView Answer on Stackoverflow
Solution 9 - Scaladalin qinView Answer on Stackoverflow
Solution 10 - ScalajeanrView Answer on Stackoverflow