spark – partition

By default, a partition is created for each HDFS partition, which by default is 64MB (from Spark’s Programming Guide).

RDDs get partitioned automatically without programmer intervention. However, there are times when you’d like to adjust the size and number of partitions or the partitioning scheme according to the needs of your application.

You use def getPartitions: Array[Partition] method on a RDD to know the set of partitions in this RDD.

When a stage executes, you can see the number of partitions for a given stage in the Spark UI

The reason for 8 Tasks in Total is that I’m on a 8-core laptop and by default the number of partitions is the number of all available cores.

You can request for the minimum number of partitions, using the second input parameter to many transformations.

Spark can only run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions (and probably 2-3x times that).

As far as choosing a “good” number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling sc.defaultParallelism.

Also, the number of partitions determines how many files get generated by actions that save RDDs to files.

The maximum size of a partition is ultimately limited by the available memory of an executor.

In the first RDD transformation, e.g. reading from a file using sc.textFile(path, partition), the partition parameter will be applied to all further transformations and actions on this RDD.

Partitions get redistributed among nodes whenever shuffle occurs. Repartitioning may cause shuffleto occur in some situations, but it is not guaranteed to occur in all cases. And it usually happens during action stage.

When creating an RDD by reading a file using rdd = SparkContext().textFile("hdfs://…/file.txt") the number of partitions may be smaller. Ideally, you would get the same number of blocks as you see in HDFS, but if the lines in your file are too long (longer than the block size), there will be fewer partitions.

Preferred way to set up the number of partitions for an RDD is to directly pass it as the second input parameter in the call like rdd = sc.textFile("hdfs://…/file.txt", 400), where 400 is the number of partitions. In this case, the partitioning makes for 400 splits that would be done by the Hadoop’s TextInputFormat, not Spark and it would work much faster. It’s also that the code spawns 400 concurrent tasks to try to load file.txt directly into 400 partitions.

It will only work as described for uncompressed files.

When using textFile with compressed files (file.txt.gz not file.txt or similar), Spark disables splitting that makes for an RDD with only 1 partition (as reads against gzipped files cannot be parallelized). In this case, to change the number of partitions you should do repartitioning.

2

Some operations, e.g. map, flatMap, filter, don’t preserve partitioning.

3

map, flatMap, filter operations apply a function to every partition.

Repartitioning RDD — repartition Transformation

repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

repartition is coalesce with numPartitions and shuffle enabled.

With the following computation you can see that repartition(5) causes 5 tasks to be started using NODE_LOCALdata locality.

Please note that Spark disables splitting for compressed files and creates RDDs with only 1 partition. In such cases, it’s helpful to use sc.textFile('demo.gz') and do repartitioning using rdd.repartition(100) as follows:

rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)

With the lines, you end up with rdd to be exactly 100 partitions of roughly equal in size.

  • rdd.repartition(N) does a shuffle to split data to match N

    • partitioning is done on round robin basis

    Please note that Spark disables splitting for compressed files and creates RDDs with only 1 partition. In such cases, it’s helpful to use sc.textFile('demo.gz') and do repartitioning using rdd.repartition(100) as follows:

    rdd = sc.textFile('demo.gz')
    rdd = rdd.repartition(100)

    With the lines, you end up with rdd to be exactly 100 partitions of roughly equal in size.

    • rdd.repartition(N) does a shuffle to split data to match N

      • partitioning is done on round robin basis

       

     

Leave a comment