DEV Community

MarwanRadwanRiad
MarwanRadwanRiad

Posted on

Error when Run pyspark on cluster windows

when run code on local work fine but if run it on master and worker return error Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task ١ in stage 0.0 failed ٤ times, most recent failure: Lost task 1.3 in stage 0.0 (TID 5) (192.168.1.7 executor 0): java.io.IOException: Failed to delete original file 'C:\Users\me\AppData\Local\Temp\spark-a1d455a2-235e-4246-aebe-a5a6c65e3366\executor-82f10b93-6cd7-4046-b825-68015156deb2\spark-be325be5-2631-4e3b-aa8d-18a13f2d58bd\broadcast1438069735290788560' after copy to 'C:\Users\me\AppData\Local\Temp\spark-a1d455a2-235e-4246-aebe-a5a6c65e3366\executor-82f10b93-6cd7-4046-b825-68015156deb2\blockmgr-08f31c2a-5b65-413b-a111-ca1f9fa90aea\37\broadcast_0_python'
and this is code
from pyspark import SparkContext, SparkConf, broadcast
from typing import List
import findspark

if name == "main":
findspark.init("c:\spark-3.4.1")
conf = SparkConf().setAppName("MyApp").setMaster("spark://ipaddress:7077")
conf.set("spark.executor.memory", "4g")
conf.set("spark.deployMode", "cluster")
#conf.set("spark.task.partitioner", "random")
conf.set("spark.executor.memory", "4g")
conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.instances", "2")
conf.set("spark.worker.cleanup.interval", "600")
#conf.set("spark.local.dir", "E:\aaa")
conf.set("spark.worker.cleanup.enabled",True)
conf.set("spark.broadcast.reuse",False)

sc = SparkContext(conf=conf)

# Define the array to be shared
my_array = [1, 2, 3, 4, 5]

# Broadcast the array to all worker nodes
broadcast_array = sc.broadcast(my_array)

# Define the function to be run on the array
def my_function(x):
    # Access the broadcasted array
    arr = broadcast_array.value
    return [i * x for i in arr]

# Create RDD and run the function multiple times on the shared array
rdd = sc.parallelize([1, 2, 3, 4])
results = rdd.flatMap(my_function).collect()

# Stop the Spark context
sc.stop()

# Print the results
print(results)
Enter fullscreen mode Exit fullscreen mode

Oldest comments (0)