I got the problem while running
spark-submit --master spark://localhost:7077 \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1, \
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
--conf spark.cassandra.connection.host=localhost \
consume_n_stream.py
but in the other hand
spark-submit --master local["*"] \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1, \
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
--conf spark.cassandra.connection.host=localhost \
consume_n_stream.py
is working
this's consume_n_stream.py
import logging
from pyspark.sql import SparkSession
from cassandra.cluster import Cluster
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
def create_spark_connection(url: str):
s_conn = None
try:
s_conn = SparkSession.builder \
.appName('SparkDataStreaming') \
.config('spark.jars.packages',
"com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,"
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
.config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
.config('spark.cassandra.connection.host', url) \
.config("spark.cassandra.connection.port", "9042") \
.getOrCreate()
s_conn.sparkContext.setLogLevel("ERROR")
logging.info("Spark connection created successfully!")
except Exception as e:
logging.error(f"Couldn't create the spark session due to exception {e}")
return s_conn
def connect_to_kafka(spark_conn, bootstrap_servers, topic):
spark_df = None
try:
spark_df = spark_conn.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', bootstrap_servers) \
.option('subscribe', topic) \
.option('startingOffsets', 'earliest') \
.load()
logging.info("kafka dataframe created successfully")
except Exception as e:
logging.warning(f"kafka dataframe could not be created because: {e}")
return spark_df
def create_cassandra_connection(url: str):
cas_session = None
try:
cluster = Cluster(contact_points=[url])
cas_session = cluster.connect()
logging.info("connect to Cassandra is successfully")
except Exception as e:
logging.error(f"Could not create cassandra connection due to {e}")
return cas_session
def create_keyspace(session):
session.execute("""
CREATE KEYSPACE IF NOT EXISTS spark_streams
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""")
logging.info("Keyspace created successfully!")
def create_table(session):
session.execute("""
CREATE TABLE IF NOT EXISTS spark_streams.test_streaming (
id UUID PRIMARY KEY,
first_name TEXT,
last_name TEXT,
gender TEXT,
address TEXT,
post_code TEXT,
email TEXT,
username TEXT,
registered_date TEXT,
phone TEXT,
picture TEXT);
""")
logging.info("Table created successfully!")
def select_df_from_kafka(spark_df):
schema = StructType([
StructField("id", StringType(), False),
StructField("first_name", StringType(), False),
StructField("last_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("address", StringType(), False),
StructField("post_code", StringType(), False),
StructField("email", StringType(), False),
StructField("username", StringType(), False),
StructField("registered_date", StringType(), False),
StructField("phone", StringType(), False),
StructField("picture", StringType(), False)
])
sel = spark_df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col('value'), schema).alias('data')).select("data.*")
return sel
if __name__ == "__main__":
spark_conn = create_spark_connection("localhost")
if spark_conn is not None:
spark_df = connect_to_kafka(spark_conn, "localhost:9092", "test_streaming")
print(f"spark_df: {spark_df}")
selected_df = select_df_from_kafka(spark_df)
print(f"selected_df: {selected_df}")
session = create_cassandra_connection("localhost")
print(f"session: {session}")
if session is not None:
create_keyspace(session)
print("HERE1")
create_table(session)
print("HERE2")
streaming_query = (selected_df.writeStream.format("org.apache.spark.sql.cassandra")
.option('keyspace', 'spark_streams')
.option('table', 'test_streaming')
.option('checkpointLocation', '/tmp/checkpoint')
.start())
print("HERE3")
streaming_query.awaitTermination()
this's my logging
- spark workers logging
25/01/05 17:57:05 INFO Worker: Asked to launch executor app-20250105175705-0010/0 for SparkDataStreaming
25/01/05 17:57:05 INFO SecurityManager: Changing view acls to: spark
25/01/05 17:57:05 INFO SecurityManager: Changing modify acls to: spark
25/01/05 17:57:05 INFO SecurityManager: Changing view acls groups to:
25/01/05 17:57:05 INFO SecurityManager: Changing modify acls groups to:
25/01/05 17:57:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark; groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY
25/01/05 17:57:05 INFO ExecutorRunner: Launch command: "/opt/bitnami/java/bin/java" "-cp" "/opt/bitnami/spark/conf/:/opt/bitnami/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=42227" "-Dspark.cassandra.connection.port=9042" "-Djava.net.preferIPv6Addresses=false" "-XX:+IgnoreUnrecognizedVMOptions" "--add-opens=java.base/java.lang=ALL-UNNAMED" "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED" "--add-opens=java.base/java.io=ALL-UNNAMED" "--add-opens=java.base/java.net=ALL-UNNAMED" "--add-opens=java.base/java.nio=ALL-UNNAMED" "--add-opens=java.base/java.util=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED" "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED" "--add-opens=java.base/sun.security.action=ALL-UNNAMED" "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" "-Djdk.reflect.useDirectMethodHandle=false" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@192.168.65.3:42227" "--executor-id" "0" "--hostname" "172.18.0.8" "--cores" "2" "--app-id" "app-20250105175705-0010" "--worker-url" "spark://Worker@172.18.0.8:35967" "--resourceProfileId" "0"
25/01/05 17:57:17 INFO Worker: Asked to kill executor app-20250105175705-0010/0
25/01/05 17:57:17 INFO ExecutorRunner: Runner thread for executor app-20250105175705-0010/0 interrupted
25/01/05 17:57:17 INFO ExecutorRunner: Killing process!
25/01/05 17:57:17 INFO Worker: Executor app-20250105175705-0010/0 finished with state KILLED exitStatus 143
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 0
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20250105175705-0010, execId=0)
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Application app-20250105175705-0010 removed, cleanupLocalDirs = true
25/01/05 17:57:17 INFO Worker: Cleaning up local directories for application app-20250105175705-0010
- spark master logging
25/01/05 17:57:05 INFO Master: Start scheduling for app app-20250105175705-0010 with rpId: 0
25/01/05 17:57:17 INFO Master: Received unregister request from application app-20250105175705-0010
25/01/05 17:57:17 INFO Master: Removing app app-20250105175705-0010
25/01/05 17:57:17 INFO Master: 172.18.0.1:43216 got disassociated, removing it.
25/01/05 17:57:17 INFO Master: 192.168.65.3:42227 got disassociated, removing it.
25/01/05 17:57:17 WARN Master: Got status update for unknown executor app-20250105175705-0010/0
- main code logging
:: loading settings :: url = jar:file:/usr/local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
spark_df: DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]
selected_df: DataFrame[id: string, first_name: string, last_name: string, gender: string, address: string, post_code: string, email: string, username: string, registered_date: string, phone: string, picture: string]
WARNING:cassandra.cluster:Cluster.__init__ called with contact_points specified, but no load_balancing_policy. In the next major version, this will raise an error; please specify a load-balancing policy. (contact_points = ['127.0.0.1'], lbp = None)
WARNING:cassandra.cluster:Downgrading core protocol version from 66 to 65 for 127.0.0.1:9042. To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version
WARNING:cassandra.cluster:Downgrading core protocol version from 65 to 5 for 127.0.0.1:9042. To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version
session: <cassandra.cluster.Session object at 0x7f383127ec80>
HERE1
HERE2
HERE3
.
.
.
Traceback (most recent call last):
File "/workspace/consume_n_stream.py", line 32, in <module>
streaming_query.awaitTermination()
File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 221, in awaitTerminatio
n
File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 185, in deco
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 446ae8f0-92b7-426b-8a9b-67e2c98eb9dc, runId = 8b2ca8
3a-5465-42e1-8f3e-b2c3a00c2a42] terminated with exception: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent
failure: Lost task 0.3 in stage 0.0 (TID 3) (172.18.0.7 executor 0): java.io.IOException: Failed to open native connection to Cassandra at
{localhost:9042} :: Could not reach any contact point, make sure you've provided valid addresses (showing first 2 nodes, use getAllErrors()
for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=3bb98041): [com.datastax.oss.driver.api.core.connection.ConnectionI
nitException: [s3|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.
shaded.netty.channel.StacklessClosedChannelException)], Node(endPoint=localhost/[0:0:0:0:0:0:0:1]:9042, hostId=null, hashCode=357e7946): [co
m.datastax.oss.driver.api.core.connection.ConnectionInitException: [s3|control|connecting...] Protocol initialization request, step 1 (OPTIO
NS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)]
.
.
.
- this's my related service on docker-compose.yml
spark-master:
image: bitnami/spark:3.5.1
container_name: spark-master
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
networks:
- confluent
spark-worker:
image: bitnami/spark:3.5.1
container_name: spark-worker
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
networks:
- confluent
cassandra_db:
image: cassandra:latest
container_name: cassandra
hostname: cassandra
ports:
- "9042:9042"
environment:
- MAX_HEAP_SIZE=512M
- HEAP_NEWSIZE=100M
- CASSANDRA_USERNAME=cassandra
- CASSANDRA_PASSWORD=cassandra
networks:
- confluent
networks:
confluent:
driver: bridge
- this's my workspace environments
FROM python:3.11
# got java version 17
RUN apt-get update
RUN apt-get install default-jdk -y
WORKDIR /workspace
RUN pip install pyspark==3.5.1 cassandra-driver==3.29.2
then activate my workspace
docker run -it --rm --name=workspace --network=host -v $(pwd):/workspace --shm-size=4g workspace bash
desire: spark workers can write data to cassandra's database.