In Apache Spark Every Data Frame contains a schema. Schema contains information of column names and data types etc. of each column.
In ETL process whenever schema is changed such as new column added/removed or column data type is changed. Spark doesn’t enforce a schema and doesn’t throw any exception. So, data lake is polluted with garbage data which degraded the data quality.
Delta lake prevents data with incompatible schema from being written. Delta Lake uses schema validation on write, which means that all new writes to a table are checked for compatibility with the target table’s schema at write time. If the schema is not compatible, Delta Lake cancels the transaction and raises an exception to let the user know about the mismatch.
Delta lake give merge schema option to merge old schema with new schema which handle schema evolution scenario.
For example, we have two columns’ data frame, and we write it into disk now we try to append new data feed with three columns , Delta Lake ‘ll raise exception and ‘ll not append data.
We have a data set of tow column employee id and salary.
val emp1 = spark.createDF(
List(
(101, 20000),
(102, 30000),
(103, 25000),
(104, 50000)
), List(
(“empid”, IntegerType, true),
(“sal”, IntegerType, true)
)
)
emp1.write.format(“delta”).mode(“append”).save(“/tmp/data/emp/”)
spark.read.format(“delta”).load(“/tmp/data/emp/”).show()
empid | sal |
101 | 20000 |
102 | 30000 |
103 | 25000 |
104 | 50000 |
Let’s append a new data set with three columns empid,sal,dep
val emp2 = spark.createDF(
List(
(201, 27000,1),
(202, 35000,2),
(203, 25000,1),
(204, 55000,3)
), List(
(“empid”, IntegerType, true),
(“sal”, IntegerType, true),
(“dep”, IntegerType, true),
)
)
emp2.write.format(“delta”).mode(“append”).save(“/tmp/data/emp/”)
It will throw below exception
|
Delta lake gives merge schema option to merge old schema with new schema which handle schema evolution scenario. Schema evolution is a feature that allows users to easily change a table’s current schema to accommodate data that is changing over time. Most commonly, it’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns.
Let’s rewrite code with mergeSchema option and check the output.
val emp2 = spark.createDF(
List(
(201, 27000,1),
(202, 35000,2),
(203, 25000,1),
(204, 55000,3)
), List(
(“empid”, IntegerType, true),
(“sal”, IntegerType, true),
(“dep”, IntegerType, true),
)
)
emp2.write.format(“delta”).mode(“append”).option(“mergeSchema”, “true”).save(“/tmp/data/emp/”)
spark.read.format(“delta”).load(“/tmp/data/emp/”).show()
empid | sal | dep |
201 | 27000 | 1 |
202 | 35000 | 2 |
203 | 25000 | 1 |
204 | 55000 | 3 |
101 | 20000 | null |
102 | 30000 | null |
103 | 25000 | null |
104 | 50000 | null |
In above example you can see that how delta lake “mergeSchema” option can be used to handle schema evolution scenario. It will work perfectly when you select save mode to “append”.
If you have some additional columns which is very common now a days, you can use “mergeSchema” option with append mode. If you have totally different schema you should opt “overwriteSchema” option with save mode to “overwrite”. It will wipe out the old schema and let you create a completely new table.
Asif is Head of Big Data at TenX with over 12 years of consulting experience