December, 2021

Schema Enforcement and Schema Evolution in Delta Lake

Part – 3

SHARE THIS

What is Schema in Apache Spark

In Apache Spark Every Data Frame contains a schema. Schema contains information of column names and data types etc. of each column.

How Spark Handle Schema Changes

In ETL process whenever schema is changed such as new column added/removed or column data type is changed. Spark doesn’t enforce a schema and doesn’t throw any exception. So, data lake is polluted  with garbage data which degraded the data quality.

How Delta Lake Implements Schema Enforcement and Schema Evolution.

Delta lake prevents data with incompatible schema from being written. Delta Lake uses schema validation on write, which means that all new writes to a table are checked for compatibility with the target table’s schema at write time. If the schema is not compatible, Delta Lake cancels the transaction and raises an exception to let the user know about the mismatch.

Delta lake give merge schema option to merge old schema with new schema which handle schema evolution scenario.

 Code Example

For example, we have two columns’ data frame, and we write it into disk now we try to  append new data feed  with three columns , Delta Lake ‘ll raise exception and ‘ll not append data.

We have a data set of tow column employee id and salary.

val emp1 = spark.createDF(

      List(

            (101, 20000),

            (102, 30000),

            (103, 25000),

            (104, 50000)

      ), List(

      (“empid”, IntegerType, true),

      (“sal”, IntegerType, true)

      )

      )

     emp1.write.format(“delta”).mode(“append”).save(“/tmp/data/emp/”)

      spark.read.format(“delta”).load(“/tmp/data/emp/”).show()

empid

sal

101

20000

102

30000

103

25000

104

50000

Let’s append a new data set with three columns empid,sal,dep

val emp2 = spark.createDF(

      List(

            (201, 27000,1),

            (202, 35000,2),

            (203, 25000,1),

            (204, 55000,3)

      ), List(

      (“empid”, IntegerType, true),

      (“sal”, IntegerType, true),

      (“dep”, IntegerType, true),

      )

      )

emp2.write.format(“delta”).mode(“append”).save(“/tmp/data/emp/”)

It will throw below exception

 
 

org.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table.

To enable schema migration, please set:

‘.option(“mergeSchema”, “true”)’.

Delta lake gives merge schema option to merge old schema with new schema which handle schema evolution scenario. Schema evolution is a feature that allows users to easily change a table’s current schema to accommodate data that is changing over time. Most commonly, it’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns.

Let’s rewrite code with  mergeSchema option and check the output.

val emp2 = spark.createDF(

      List(

            (201, 27000,1),  

(202, 35000,2),

            (203, 25000,1),

            (204, 55000,3)

      ), List(

      (“empid”, IntegerType, true),

      (“sal”, IntegerType, true),

      (“dep”, IntegerType, true),

      )

      )

emp2.write.format(“delta”).mode(“append”).option(“mergeSchema”, “true”).save(“/tmp/data/emp/”)

spark.read.format(“delta”).load(“/tmp/data/emp/”).show()

empid

saldep

201

270001

202

350002

203

25000

1

204

55000

3

101

20000

null

10230000

null

10325000

null

104

50000

null

In above example you can see that how delta lake “mergeSchema” option can be used to handle schema evolution  scenario. It will work perfectly when you select save mode to “append”.

If you have some additional columns which is very common now a days, you can use “mergeSchema” option with append mode. If you have totally different schema you should opt “overwriteSchema” option with save mode to “overwrite”. It will wipe out the old schema and let you create a completely new table.

Picture of Asif Mughal

Asif Mughal

Asif is Head of Big Data at TenX with over 12 years of consulting experience