Illustration Image

Executors for PySpark app always finish with "state KILLED exitStatus 143"

I got the problem while running

spark-submit --master spark://localhost:7077 \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1, \
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
--conf spark.cassandra.connection.host=localhost \
consume_n_stream.py

but in the other hand

spark-submit --master local["*"] \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1, \
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
--conf spark.cassandra.connection.host=localhost \
consume_n_stream.py

is working

this's consume_n_stream.py

import logging

from pyspark.sql import SparkSession
from cassandra.cluster import Cluster
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

def create_spark_connection(url: str):
    s_conn = None
    
    try:
        s_conn = SparkSession.builder \
                .appName('SparkDataStreaming') \
                .config('spark.jars.packages', 
                        "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,"
                        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
                .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
                .config('spark.cassandra.connection.host', url) \
                .config("spark.cassandra.connection.port", "9042") \
                .getOrCreate()

        s_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark connection created successfully!")
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception {e}")

    return s_conn

def connect_to_kafka(spark_conn, bootstrap_servers, topic):
    
    spark_df = None
    try:
        spark_df = spark_conn.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', bootstrap_servers) \
            .option('subscribe', topic) \
            .option('startingOffsets', 'earliest') \
            .load()
            
        logging.info("kafka dataframe created successfully")
    except Exception as e:
        logging.warning(f"kafka dataframe could not be created because: {e}")

    return spark_df

def create_cassandra_connection(url: str):
    cas_session = None
    try:
        cluster = Cluster(contact_points=[url])
        cas_session = cluster.connect()
        logging.info("connect to Cassandra is successfully")
    except Exception as e:
        logging.error(f"Could not create cassandra connection due to {e}")
    
    return cas_session

def create_keyspace(session):
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS spark_streams
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    """)

    logging.info("Keyspace created successfully!")


def create_table(session):
    session.execute("""
    CREATE TABLE IF NOT EXISTS spark_streams.test_streaming (
        id UUID PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        registered_date TEXT,
        phone TEXT,
        picture TEXT);
    """)

    logging.info("Table created successfully!")

def select_df_from_kafka(spark_df):
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("address", StringType(), False),
        StructField("post_code", StringType(), False),
        StructField("email", StringType(), False),
        StructField("username", StringType(), False),
        StructField("registered_date", StringType(), False),
        StructField("phone", StringType(), False),
        StructField("picture", StringType(), False)
    ])

    sel = spark_df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col('value'), schema).alias('data')).select("data.*")

    return sel


if __name__ == "__main__":
        
    spark_conn = create_spark_connection("localhost")

    if spark_conn is not None:
        spark_df = connect_to_kafka(spark_conn, "localhost:9092", "test_streaming")
        print(f"spark_df: {spark_df}")
        selected_df = select_df_from_kafka(spark_df)
        print(f"selected_df: {selected_df}")
        session = create_cassandra_connection("localhost")
        print(f"session: {session}")

        if session is not None:
            create_keyspace(session)
            print("HERE1")
            create_table(session)
            print("HERE2")

            streaming_query = (selected_df.writeStream.format("org.apache.spark.sql.cassandra")
                            .option('keyspace', 'spark_streams')
                            .option('table', 'test_streaming')
                            .option('checkpointLocation', '/tmp/checkpoint')
                            .start())
            
            print("HERE3")
            streaming_query.awaitTermination()

this's my logging

  • spark workers logging
25/01/05 17:57:05 INFO Worker: Asked to launch executor app-20250105175705-0010/0 for SparkDataStreaming
25/01/05 17:57:05 INFO SecurityManager: Changing view acls to: spark
25/01/05 17:57:05 INFO SecurityManager: Changing modify acls to: spark
25/01/05 17:57:05 INFO SecurityManager: Changing view acls groups to: 
25/01/05 17:57:05 INFO SecurityManager: Changing modify acls groups to: 
25/01/05 17:57:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark; groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY
25/01/05 17:57:05 INFO ExecutorRunner: Launch command: "/opt/bitnami/java/bin/java" "-cp" "/opt/bitnami/spark/conf/:/opt/bitnami/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=42227" "-Dspark.cassandra.connection.port=9042" "-Djava.net.preferIPv6Addresses=false" "-XX:+IgnoreUnrecognizedVMOptions" "--add-opens=java.base/java.lang=ALL-UNNAMED" "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED" "--add-opens=java.base/java.io=ALL-UNNAMED" "--add-opens=java.base/java.net=ALL-UNNAMED" "--add-opens=java.base/java.nio=ALL-UNNAMED" "--add-opens=java.base/java.util=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED" "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED" "--add-opens=java.base/sun.security.action=ALL-UNNAMED" "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" "-Djdk.reflect.useDirectMethodHandle=false" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@192.168.65.3:42227" "--executor-id" "0" "--hostname" "172.18.0.8" "--cores" "2" "--app-id" "app-20250105175705-0010" "--worker-url" "spark://Worker@172.18.0.8:35967" "--resourceProfileId" "0"
25/01/05 17:57:17 INFO Worker: Asked to kill executor app-20250105175705-0010/0
25/01/05 17:57:17 INFO ExecutorRunner: Runner thread for executor app-20250105175705-0010/0 interrupted
25/01/05 17:57:17 INFO ExecutorRunner: Killing process!
25/01/05 17:57:17 INFO Worker: Executor app-20250105175705-0010/0 finished with state KILLED exitStatus 143
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 0
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20250105175705-0010, execId=0)
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Application app-20250105175705-0010 removed, cleanupLocalDirs = true
25/01/05 17:57:17 INFO Worker: Cleaning up local directories for application app-20250105175705-0010
  • spark master logging
25/01/05 17:57:05 INFO Master: Start scheduling for app app-20250105175705-0010 with rpId: 0
25/01/05 17:57:17 INFO Master: Received unregister request from application app-20250105175705-0010
25/01/05 17:57:17 INFO Master: Removing app app-20250105175705-0010
25/01/05 17:57:17 INFO Master: 172.18.0.1:43216 got disassociated, removing it.
25/01/05 17:57:17 INFO Master: 192.168.65.3:42227 got disassociated, removing it.
25/01/05 17:57:17 WARN Master: Got status update for unknown executor app-20250105175705-0010/0
  • main code logging
:: loading settings :: url = jar:file:/usr/local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
spark_df: DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]
selected_df: DataFrame[id: string, first_name: string, last_name: string, gender: string, address: string, post_code: string, email: string, username: string, registered_date: string, phone: string, picture: string]
WARNING:cassandra.cluster:Cluster.__init__ called with contact_points specified, but no load_balancing_policy. In the next major version, this will raise an error; please specify a load-balancing policy. (contact_points = ['127.0.0.1'], lbp = None)
WARNING:cassandra.cluster:Downgrading core protocol version from 66 to 65 for 127.0.0.1:9042. To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version
WARNING:cassandra.cluster:Downgrading core protocol version from 65 to 5 for 127.0.0.1:9042. To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version
session: <cassandra.cluster.Session object at 0x7f383127ec80>
HERE1
HERE2
HERE3
.
.
.
Traceback (most recent call last):                                                                                                          
  File "/workspace/consume_n_stream.py", line 32, in <module>                                                                               
    streaming_query.awaitTermination()                                                                                                      
  File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 221, in awaitTerminatio
n                                                                                                                                           
  File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__      
  File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 185, in deco    
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 446ae8f0-92b7-426b-8a9b-67e2c98eb9dc, runId = 8b2ca8
3a-5465-42e1-8f3e-b2c3a00c2a42] terminated with exception: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent
 failure: Lost task 0.3 in stage 0.0 (TID 3) (172.18.0.7 executor 0): java.io.IOException: Failed to open native connection to Cassandra at 
{localhost:9042} :: Could not reach any contact point, make sure you've provided valid addresses (showing first 2 nodes, use getAllErrors() 
for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=3bb98041): [com.datastax.oss.driver.api.core.connection.ConnectionI
nitException: [s3|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.
shaded.netty.channel.StacklessClosedChannelException)], Node(endPoint=localhost/[0:0:0:0:0:0:0:1]:9042, hostId=null, hashCode=357e7946): [co
m.datastax.oss.driver.api.core.connection.ConnectionInitException: [s3|control|connecting...] Protocol initialization request, step 1 (OPTIO
NS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)]
.
.
.
  • this's my related service on docker-compose.yml
    spark-master:
        image: bitnami/spark:3.5.1
        container_name: spark-master
        command: bin/spark-class org.apache.spark.deploy.master.Master
        ports:
            - "9090:8080"
            - "7077:7077"
        networks:
            - confluent
    spark-worker:
        image: bitnami/spark:3.5.1
        container_name: spark-worker
        command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
        depends_on:
            - spark-master
        environment:
            SPARK_MODE: worker
            SPARK_WORKER_CORES: 2
            SPARK_WORKER_MEMORY: 1g
            SPARK_MASTER_URL: spark://spark-master:7077
        networks:
            - confluent

    cassandra_db:
        image: cassandra:latest
        container_name: cassandra
        hostname: cassandra
        ports:
            - "9042:9042"
        environment:
            - MAX_HEAP_SIZE=512M
            - HEAP_NEWSIZE=100M
            - CASSANDRA_USERNAME=cassandra
            - CASSANDRA_PASSWORD=cassandra
        networks:
            - confluent

networks:
  confluent:
    driver: bridge
  • this's my workspace environments
FROM python:3.11

# got java version 17
RUN apt-get update
RUN apt-get install default-jdk -y

WORKDIR /workspace

RUN pip install pyspark==3.5.1 cassandra-driver==3.29.2

then activate my workspace

docker run -it --rm --name=workspace --network=host -v $(pwd):/workspace --shm-size=4g workspace bash

desire: spark workers can write data to cassandra's database.

Become part of our
growing community!
Welcome to Planet Cassandra, a community for Apache Cassandra®! We're a passionate and dedicated group of users, developers, and enthusiasts who are working together to make Cassandra the best it can be. Whether you're just getting started with Cassandra or you're an experienced user, there's a place for you in our community.
A dinosaur
Planet Cassandra is a service for the Apache Cassandra® user community to share with each other. From tutorials and guides, to discussions and updates, we're here to help you get the most out of Cassandra. Connect with us and become part of our growing community today.
© 2009-2023 The Apache Software Foundation under the terms of the Apache License 2.0. Apache, the Apache feather logo, Apache Cassandra, Cassandra, and the Cassandra logo, are either registered trademarks or trademarks of The Apache Software Foundation. Sponsored by Anant Corporation and Datastax, and Developed by Anant Corporation.

Get Involved with Planet Cassandra!

We believe that the power of the Planet Cassandra community lies in the contributions of its members. Do you have content, articles, videos, or use cases you want to share with the world?