I have a
test table in MySQL with id and name like below:
+----+-------+ | id | name | +----+-------+ | 1 | Name1 | +----+-------+ | 2 | Name2 | +----+-------+ | 3 | Name3 | +----+-------+
I am using Spark
DataFrame to read this data (using JDBC) and modifying the data like this
Dataset<Row> modified = sparkSession.sql("select id, concat(name,' - new') as name from test"); modified.write().mode("overwrite").jdbc(AppProperties.MYSQL_CONNECTION_URL, "test", connectionProperties);
But my problem is, if I give overwrite mode, it drops the previous table and creates a new table but not inserting any data.
I tried the same program by reading from a csv file (same data as test table) and overwriting. That worked for me.
Am I missing something here ?
Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.
The problem is in your code. Because you overwrite a table from which you’re trying to read you effectively obliterate all data before Spark can actually access it.
Remember that Spark is lazy. When you create a
Dataset Spark fetches required metadata, but doesn’t load the data. So there is no magic cache which will preserve original content. Data will be loaded when it is actually required. Here it is when you execute
write action and when you start writing there is no more data to be fetched.
What you need is something like this:
- Create a
- Apply required transformations and write data to an intermediate MySQL table.
TRUNCATEthe original input and
INSERT INTO ... SELECTfrom the intermediate table or
DROPthe original table and
Alternative, but less favorable approach, would be:
- Create a
- Apply required transformations and write data to a persistent Spark table (
TRUNCATEthe original input.
- Read data back and save (
- Drop Spark table.
We cannot stress enough that using Spark
persist is not the way to go. Even in with the conservative
MEMORY_AND_DISK_SER_2) cached data can be lost (node failures), leading to silent correctness errors.
I believe all the steps above are unnecessary. Here’s what you need to do:
Create a dataset
val A = spark.read.parquet("....")
Read the table to be updated, as dataframe
B. Make sure enable caching is enabled for dataframe
val B = spark.read.jdbc("mytable").cache
B– this will force execution and cache the table depending on the chosen
Now, you can do a transformation like
val C = A.union(B)
And, then write
Cback to the database like
Reading and writing to same table.
cols_df = df_2.columns broad_cast_var = spark_context.broadcast(df_2.collect()) df_3 = sqlContext.createDataFrame(broad_cast_var.value, cols_df)
Reading and writing to same table with some modification.
cols_df = df_2.columns broad_cast_var = spark_context.broadcast(df_2.collect()) def update_x(x): y = (x + 311, *x[1:]) return y rdd_2_1 = spark_context.parallelize(broad_cast_var.value).map(update_x) df_3 = sqlContext.createDataFrame(rdd_2_1, cols_df)