Problem Definition
Divide the Spark RDD into N(as defined by user) sub parts and perform same function on these sub parts with different data set on each sub part
Explanation
I am having RDD. I want to divide that RDD into sub parts let call him partitions. Now on each partition i want to perform same task for multiple iteration.
Here is code that explains well
case class shape ( dim:Int) {
val random = new Random()
var X : Array[Double] = Array.fill(dim)(random.nextDouble() * (100-10)+11 )
var Y : Array[Double] = Array.fill(dim)( math.random)
var Loss : Double = math.random
var NewLoss : Double = math.random
}
val N = 1000 //in real N will be in millions
val d = 100 //in real N will be in millions
val nP = 4 // nP defines number of sub parts into which RDD is divided
val iterations = 1000 // in real analysis iteration will be in millions or billions
val list = List.fill(N)(new shape(d))
list.map { x =>
x.Loss = function. SphereFunc(x.X) // Update Loss of each element
}
val rdd = sc.parallelize(batList, nP)
var partitioned = rdd.persist()
for(iter <- 1 to iterations) {
partitioned = partitioned.mapPartitionsWithIndex { (k, iterator ) =>
val li = iterator.toList
val localBest = li.minBy(_.Loss).X
li.map { j =>
j.Y = ((j.X, localBest).zipped.map(_ - _).map(_ * math.random), j.Y).zipped.map(_ + _)
j.X = (j.X, j.Y).zipped.map(_ + _)
}
li.filter(math.random > _.Loss)
.map { j =>
j.X = localBest.map(_ + math.random)
}
li.map{j => j.NewLoss = SphereFunc(j.X)
li.filter(j => math.random < j.NewLoss && j.NewLoss < j.Loss).map { j =>
j.Loss = j.NewLoss
}
li.iterator
}
def SphereFunc(list: List[Double]): Double = {
list.foldLeft(0.0)((x, xs) => x + xs * xs)
}
In this code I created RDD and for each iteration I call mapPartitionsWithIndex to get each partition data and updated elements of each partition.
Problem with this code is that for each iteration, when I call mapPartitionsWithIndex it will perform required operation on each element of partitions and it will again create RDD. So partitions will not remain same. For each iteration RDD is divided again to partitions. Also will cause lot of shuffeling.
But I want to create partitions from RDD at start and than perform desired operation on partitions till user defined iteration completes.
Here i am calling mapPartitionsWithIndex iterations times. Is this possible to call mapPartitionsWithIndex only one time and than iterate over partitions till iterations
How can I achieve that? I have to run experiments on cluster with more than 100 cores.
Top comments (0)