Executors for PySpark app always finish with "state KILLED exitStatus 143"

I got the problem while running

spark-submit --master spark://localhost:7077 \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1, \
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
--conf \

but in the other hand

spark-submit --master local["*"] \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1, \
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
--conf \

is working


import logging

from pyspark.sql import SparkSession
from cassandra.cluster import Cluster
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

def create_spark_connection(url: str):
    s_conn = None
        s_conn = SparkSession.builder \
                .appName('SparkDataStreaming') \
                        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
                .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
                .config('', url) \
                .config("spark.cassandra.connection.port", "9042") \

        s_conn.sparkContext.setLogLevel("ERROR")"Spark connection created successfully!")
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception {e}")

    return s_conn

def connect_to_kafka(spark_conn, bootstrap_servers, topic):
    spark_df = None
        spark_df = spark_conn.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', bootstrap_servers) \
            .option('subscribe', topic) \
            .option('startingOffsets', 'earliest') \
           "kafka dataframe created successfully")
    except Exception as e:
        logging.warning(f"kafka dataframe could not be created because: {e}")

    return spark_df

def create_cassandra_connection(url: str):
    cas_session = None
        cluster = Cluster(contact_points=[url])
        cas_session = cluster.connect()"connect to Cassandra is successfully")
    except Exception as e:
        logging.error(f"Could not create cassandra connection due to {e}")
    return cas_session

def create_keyspace(session):
        CREATE KEYSPACE IF NOT EXISTS spark_streams
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    """)"Keyspace created successfully!")

def create_table(session):
    CREATE TABLE IF NOT EXISTS spark_streams.test_streaming (
        id UUID PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        registered_date TEXT,
        phone TEXT,
        picture TEXT);
    """)"Table created successfully!")

def select_df_from_kafka(spark_df):
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("address", StringType(), False),
        StructField("post_code", StringType(), False),
        StructField("email", StringType(), False),
        StructField("username", StringType(), False),
        StructField("registered_date", StringType(), False),
        StructField("phone", StringType(), False),
        StructField("picture", StringType(), False)

    sel = spark_df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col('value'), schema).alias('data')).select("data.*")

    return sel

if __name__ == "__main__":
    spark_conn = create_spark_connection("localhost")

    if spark_conn is not None:
        spark_df = connect_to_kafka(spark_conn, "localhost:9092", "test_streaming")
        print(f"spark_df: {spark_df}")
        selected_df = select_df_from_kafka(spark_df)
        print(f"selected_df: {selected_df}")
        session = create_cassandra_connection("localhost")
        print(f"session: {session}")

        if session is not None:

            streaming_query = (selected_df.writeStream.format("org.apache.spark.sql.cassandra")
                            .option('keyspace', 'spark_streams')
                            .option('table', 'test_streaming')
                            .option('checkpointLocation', '/tmp/checkpoint')

this's my logging

  • spark workers logging
25/01/05 17:57:05 INFO Worker: Asked to launch executor app-20250105175705-0010/0 for SparkDataStreaming
25/01/05 17:57:05 INFO SecurityManager: Changing view acls to: spark
25/01/05 17:57:05 INFO SecurityManager: Changing modify acls to: spark
25/01/05 17:57:05 INFO SecurityManager: Changing view acls groups to: 
25/01/05 17:57:05 INFO SecurityManager: Changing modify acls groups to: 
25/01/05 17:57:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark; groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY
25/01/05 17:57:05 INFO ExecutorRunner: Launch command: "/opt/bitnami/java/bin/java" "-cp" "/opt/bitnami/spark/conf/:/opt/bitnami/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=42227" "-Dspark.cassandra.connection.port=9042" "" "-XX:+IgnoreUnrecognizedVMOptions" "--add-opens=java.base/java.lang=ALL-UNNAMED" "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED" "--add-opens=java.base/" "--add-opens=java.base/" "--add-opens=java.base/java.nio=ALL-UNNAMED" "--add-opens=java.base/java.util=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED" "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" "--add-opens=java.base/" "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED" "--add-opens=java.base/" "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" "" "-Djdk.reflect.useDirectMethodHandle=false" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@" "--executor-id" "0" "--hostname" "" "--cores" "2" "--app-id" "app-20250105175705-0010" "--worker-url" "spark://Worker@" "--resourceProfileId" "0"
25/01/05 17:57:17 INFO Worker: Asked to kill executor app-20250105175705-0010/0
25/01/05 17:57:17 INFO ExecutorRunner: Runner thread for executor app-20250105175705-0010/0 interrupted
25/01/05 17:57:17 INFO ExecutorRunner: Killing process!
25/01/05 17:57:17 INFO Worker: Executor app-20250105175705-0010/0 finished with state KILLED exitStatus 143
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 0
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20250105175705-0010, execId=0)
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Application app-20250105175705-0010 removed, cleanupLocalDirs = true
25/01/05 17:57:17 INFO Worker: Cleaning up local directories for application app-20250105175705-0010
  • spark master logging
25/01/05 17:57:05 INFO Master: Start scheduling for app app-20250105175705-0010 with rpId: 0
25/01/05 17:57:17 INFO Master: Received unregister request from application app-20250105175705-0010
25/01/05 17:57:17 INFO Master: Removing app app-20250105175705-0010
25/01/05 17:57:17 INFO Master: got disassociated, removing it.
25/01/05 17:57:17 INFO Master: got disassociated, removing it.
25/01/05 17:57:17 WARN Master: Got status update for unknown executor app-20250105175705-0010/0
  • main code logging
:: loading settings :: url = jar:file:/usr/local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
spark_df: DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]
selected_df: DataFrame[id: string, first_name: string, last_name: string, gender: string, address: string, post_code: string, email: string, username: string, registered_date: string, phone: string, picture: string]
WARNING:cassandra.cluster:Cluster.__init__ called with contact_points specified, but no load_balancing_policy. In the next major version, this will raise an error; please specify a load-balancing policy. (contact_points = [''], lbp = None)
WARNING:cassandra.cluster:Downgrading core protocol version from 66 to 65 for To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster.
WARNING:cassandra.cluster:Downgrading core protocol version from 65 to 5 for To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster.
session: <cassandra.cluster.Session object at 0x7f383127ec80>
Traceback (most recent call last):                                                                                                          
  File "/workspace/", line 32, in <module>                                                                               
  File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/", line 221, in awaitTerminatio
  File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/", line 1322, in __call__      
  File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/", line 185, in deco    
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 446ae8f0-92b7-426b-8a9b-67e2c98eb9dc, runId = 8b2ca8
3a-5465-42e1-8f3e-b2c3a00c2a42] terminated with exception: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent
 failure: Lost task 0.3 in stage 0.0 (TID 3) ( executor 0): Failed to open native connection to Cassandra at 
{localhost:9042} :: Could not reach any contact point, make sure you've provided valid addresses (showing first 2 nodes, use getAllErrors() 
for more): Node(endPoint=localhost/, hostId=null, hashCode=3bb98041): [com.datastax.oss.driver.api.core.connection.ConnectionI
nitException: [s3|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.], Node(endPoint=localhost/[0:0:0:0:0:0:0:1]:9042, hostId=null, hashCode=357e7946): [co
m.datastax.oss.driver.api.core.connection.ConnectionInitException: [s3|control|connecting...] Protocol initialization request, step 1 (OPTIO
NS): failed to send request (]
  • this's my related service on docker-compose.yml
        image: bitnami/spark:3.5.1
        container_name: spark-master
        command: bin/spark-class org.apache.spark.deploy.master.Master
            - "9090:8080"
            - "7077:7077"
            - confluent
        image: bitnami/spark:3.5.1
        container_name: spark-worker
        command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
            - spark-master
            SPARK_MODE: worker
            SPARK_WORKER_CORES: 2
            SPARK_WORKER_MEMORY: 1g
            SPARK_MASTER_URL: spark://spark-master:7077
            - confluent

        image: cassandra:latest
        container_name: cassandra
        hostname: cassandra
            - "9042:9042"
            - MAX_HEAP_SIZE=512M
            - HEAP_NEWSIZE=100M
            - CASSANDRA_USERNAME=cassandra
            - CASSANDRA_PASSWORD=cassandra
            - confluent

    driver: bridge
  • this's my workspace environments
FROM python:3.11

# got java version 17
RUN apt-get update
RUN apt-get install default-jdk -y

WORKDIR /workspace

RUN pip install pyspark==3.5.1 cassandra-driver==3.29.2

then activate my workspace

docker run -it --rm --name=workspace --network=host -v $(pwd):/workspace --shm-size=4g workspace bash

desire: spark workers can write data to cassandra's database.

