From kafka producer messages are streaming, but not getting saved in the database. Am using mysql 8.0.20 . Am i missing anything?
import org.apache.spark.sql.SparkSession import org.apache.spark.sql import org.apache.spark.SparkConf import org.apache.spark.sql.functions.{col, from_json} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import java.time.{ZonedDateTime, ZoneId} import java.time.format.DateTimeFormatter object SparkStreamingKafka1 { def main(args:Array[String]):Unit={ System.setProperty("hadoop.home.dir", "C:\hadoop\") val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate() spark.sparkContext.setLogLevel("OFF") import spark.implicits._ val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "demo1") .option("startingOffsets", "earliest") .load() val personStringDF = df.selectExpr("CAST(value AS STRING)") val schema=new StructType() .add("stock_name",StringType) .add("stock_price",IntegerType) .add("date",StringType) val personDF = personStringDF.select(from_json(col("value"), schema).as("data")) .select("data.*") personDF.createOrReplaceTempView("persontab") spark.sql("""select min(stock_price) as min_stock_price, max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""") .writeStream.foreachBatch {(batchDF, batchId) => println("inside the foreachbatch") batchDF.show() batchDF.write.format("jdbc") .mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb") .option("dbtable","max_min_avg") .option("user","root") .option("password","root") .option("header","true") .save() println("saved") } .outputMode("complete") .start() } }
Below is the pom.xml
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.15</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.5</version> </dependency>
i have to include another two more sql queries , do i need to write separate foreachbatch for each and every sql query? or else is there any code optimization. Any suggestions please.
Answers:
Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.
Method 1
package mysql.kafka.streaming import org.apache.spark.sql.SparkSession import org.apache.spark.sql._ import org.apache.spark.SparkConf import org.apache.spark.sql.functions.{col, from_json} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import java.time.{ZonedDateTime, ZoneId} import java.time.format.DateTimeFormatter object SparkStreamingKafka1 { def main(args:Array[String]):Unit={ System.setProperty("hadoop.home.dir", "C:\hadoop\") val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate() spark.sparkContext.setLogLevel("OFF") import spark.implicits._ val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "demo2") .option("startingOffsets", "earliest") // From starting .load() val personStringDF = df.selectExpr("CAST(value AS STRING)") val schema=new StructType() .add("stock_name",StringType) .add("stock_price",IntegerType) .add("date",StringType) val personDF = personStringDF.select(from_json(col("value"), schema).as("data")) .select("data.*") personDF.createOrReplaceTempView("persontab") spark.sql("""select min(stock_price) as min_stock_price, max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""") .writeStream.outputMode("complete").foreachBatch{(batchDF:DataFrame,batchId:Long) => println("inside the foreachbatch1") batchDF.show() batchDF.write.format("jdbc") .mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb") .option("dbtable","max_min_avg") .option("user","root") .option("password","root") .option("header","true") .save() println("saved") } .start() .awaitTermination() } }
The above code is working now. I have removed the outputMode(“complete”) before the start command and kept after the writestream. So after that it was working fine.
All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0