6

If I want to use delta time-travel to compare two versions to get changes similar to CDC, how to do that?

I can see two options:

  1. in SQL you have EXCEPT/MINUS query where you compare all data with another table. I would assume you can also use that, correct? But is that fast enough if you the versions you compare getting bigger and bigger and you always need to compare all against all rows of the latest version?

  2. Is Delta making some kind of hash per row and can do that very fast, or is that very time consuming for delta?


Found on slack

4

2 回答 2

6

You can compute the difference of two versions of the table, but as you guessed it’s expensive to do. it’s also tricky to compute the actual difference when the delta table has changes other than appends.

usually when people ask about this, they’re trying to design their own system that gives them exactly one processing of data from delta to somewhere; spark streaming + Delta source already exists to do this

if you do want to write your own, you can read the transaction log directly (protocol spec is at https://github.com/delta-io/delta/blob/master/PROTOCOL.md) and use the actions in the versions between the two you’re computing to figure out which files have changes to read


Please note that versions of a delta table are cached (persisted by Spark) so comparing different datasets should be fairly cheap.

val v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/t2")
val v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/t2")
// v0 and v1 are persisted - see Storage tab in web UI

Getting those v0 and v1 isn’t expensive; comparing the two can be both expensive and tricky. If the table is append-only then it’s (v1 - v0); if it’s got upserts then you have to handle (v0 - v1) as well, and if it’s got metadata or protocol changes it gets even trickier.

And when you do all that logic yourself it’s suspiciously similar to re-implementing DeltaSource.


You may then consider the following:

val log = DeltaLog.forTable(spark, "/tmp/delta/t2")
val v0 = log.getSnapshotAt(0)
val actionsAtV0 = v0.state

val v1 = log.getSnapshotAt(1)
val actionsAtV1 = v1.state

actionsAtV0 and actionsAtV1 are all the actions that brought the delta table to versions 0 and 1, respectively, and can be considered a CDC of the delta table.

That's basically reading the transaction log, except using some Delta’s internal APIs to make that easier.

于 2020-01-04T14:19:19.580 回答
1

Databricks recently added Change Data Feed (previously known as Delta Change Data Capture) and it seems directly addresses this use case -

https://docs.databricks.com/release-notes/runtime/8.2.html#incrementally-ingest-updates-and-deletions-in-delta-tables-using-a-change-data-feed-public-preview

The change data feed of a Delta table represents the row-level changes between different versions of the table. When enabled, the runtime records additional information regarding row-level changes for every write operation on the table. You can query these changes through SQL and DataFrame and DataStream readers. The feed enables:

  • Efficient downstream consumption of merge, updates, and deletes. Getting rows that were updated, inserted, or deleted greatly improves the performance of the downstream job consuming the output of the merge as entire files need not be processed and deduplicated now.
  • Maintaining sync between replicas of two different tables representing the same data. It is a common practice to maintain two versions of the same table one narrow table as the source of truth and a wider table with additional data. Changes can be efficiently applied from the narrow table to the wider table.
于 2021-04-28T18:12:24.473 回答