Ale or Lager, you have to reconcile your Real-time Operational Data Store anyway
by Ludek Bob Jankovsky, 11-Oct-2019 (ETL PATTERNS)
After few lagers I will return to common Real time Operational Data Store topic. You can brew Lager, you can brew Ale, you can brew Stout, but still you will have to taste if you have braved it well. You will have to consider possibility of consistency loss in your system and you have to be ready for that.
The first inconsistency is the initial one, the state of the system before initial load. Then there could be consistency breaches, down-times of particular services etc. Never get too pampered by good taste of the first batch, because the second one could turn sour. And better than end up with sour faces, lets be ready for that. Lets involve consistency tests, consistency repairs, hard repairs to our brewing process.
This article follows previous three in the series:
There is one thing related to equality of brewing methods in the reconciliation process. While there is much more complicated to brew lager than to brew ale or stout, matter of reconciliation is opposite way easier at the lager brewing.
So what is the reconciliation for? You cannot breve good beer not checking the taste. You even have to take some measures to make the taste proper, time after time.
There are following functions of reconciliation:
Quality testing - Real-time ETL works exclusive incrementally. There is always risk of losing some information (e.g. lost signals, lost micro-batches, extraordinary events). There should be way to check if everything is in order. These situations appear while testing the solution, when testing consistency after data quality doubts (in the process of solving production issues) etc. There is useful to use metadata of transformation to generate scripts allowing to check extent of differences (or confirm there are no differences), to show the difference detail i.e. differing columns to help to find the problem, to show the difference on the row level sample etc.
Quality monitoring - There is reasonable to make a subset of the tasks mentioned above periodically to alert about inconsistent state of our data. Again it should be done by generated processes based on the same metadata.
Quality restoring - Reconciliation could be either passive (informing, alerting) or active (repairing difference). The active reconciliation can cover the task of Initial load too (hard reconciliation) and it could be done as a non-invasive reconciliation allowing to reconcile data without down-time of your solution (smooth reconciliation).
All these ways mentioned above can (and should) be generated from the same metadata of transformation and should be implemented in your solution to get reliably good taste.
The first question we have to answer first is - Where do we get source data for the reconciliation? The picture above shows four various sources of data for reconciliation of our ETL processes.
[1] - Load source table for reconciliation. This is the most elementary method useful especially for CDC based Lager-wise ETL patterns. We integrate ODS based on CDC events on the level of physical tables in the source. In that way the source has the same shape as the real-time source and we can use the same transformation metadata for generating of the reconciliation process. That is an element of the beer equality, as more complex the lager-wise ODS is in comparison to ale or stout one, the reconciliation is just straight-forward.
[2] - Load and transform source tables for reconciliation. This way is more tricky. We do the real-time integration based on messaging initiated either from UI or by the source of records, but we do the reconciliation bulk transformation totally sideways. By one hand it is a duplication of the transformation logic, by the other hand it supports better test efficiency because it can catch errors in the original transformation. It simply supports real double check from the logical design phase on the account of additional effort while design and each redesign phase.
[3] - Use "Fake shake" of the source. - Some cool way to get reconciliation data. We use the same infrastructure to get data into our target, just like in the real-time processing. We just do that for all records in the source. That can be realized as a fake event chain for all records or by surrogate filling the queue on the source of records side. The fake shake can be used for internal transformations within layers of ODS, but you have to disable early filtering for that reason.
[4] - Data from compacted messaging topics. - Most current messaging systems offer an option of compacting of topics. That way last state of each keyed value of the topic is stored and they can be used as a source for reconciliation.
[5] - Internal reconciliation - Aside external reconciliation between source systems and Real-time Operational Data Store there are internal glitches and inconsistencies possible in the result of between-layers transformations leakages. That can happen when the identification process misses some changes either cause of definition error of in the result of some rare collocation of events. Internal reconciliation can be also necessary to follow external one while using methods disallowing chaining of real-time processing or during initial loads.
Reconciliation patterns - primitive
Hereby we will return to early time of Data warehousing ETL patterns where we simply take data from the source and merge them into the target either with or without deleting of orphan records in the target. That approach is simple and if neither internal nor external factor prevents us, then why not? These patterns are very efficient for initial loads in bulk mode when we can afford downtime before the start of the system and we need to minimize the time of initial load.
Owerwrite pattern:
This pattern feels primitive even between primitive patterns. Data in the target table are truncated before inserting all the new set of data from the source. It is the fastest way to fill empty system at initial load. It has huge disadvantages for reconciliation loads what are not initial as we lose information about the change timestamp, we cannot identify what has changed after the reconciliation if anything. That way the testing factor evaluating, if the reconciliation was even needed, falls apart at that approach.
Full refresh pattern
The functional aspect of the Full refresh pattern is similar as the previous one. We do following things:
insert records missing in the target
update records different in the target based on source
delete (soft delete) records which are in the target and are not in the source.
Similarly as the previous pattern this one supposes full source dataset for the deleting part.
Differently from the previous pattern we minimize volume of changes and we can also support audit attributes e.g. inserted/updated timestamp, deleted flag (for soft delete) etc. This approach also allows to chain real-time replication after this bulk reconciliation process.
Diff merge pattern
Diff merge pattern is similar to the previous one except the deleting part. That makes it useful even in cases we have not full source data set. Similarly to the Full refresh pattern it allows to chain real-time processing behind this bulk operation and it supports all these audit attributes too. Sometimes it can be used in a combination with the Full refresh one when based on the type of source extract coming you choose pattern to be used and you plan data extraction in a cyclic pattern (e.g. F - i - i - i - i - i - F - i - i - i - i - i - F...).
External and internal factors of the reconciliation process
There are various external factors of the reconciliation process depending on the data source, its type, content, and performance limitations as well as internal factors depending on the Operational data store business continuity, SLA, and ETL logic.
External factors:
Data source and technology - as mentioned above.
Full vs. Partial vs. Incremental source dataset could significantly impact set of possible load methods. Incremental or partial datasets can be used either to reduce data transfer or just because full data are not available in the source such as compacted topic, or because of aging strategy (ILM) in the source.
Full loads contains all data in the source and can be used for full refresh strategies when we (soft) delete records in our copy missing in the incoming dataset.
Incremental loads contains only changed set of data. These can be either flat containing new value of changed data or they can contain more information about the change so they can contain information of deleted records too. In the case of flat incremental data we have no information supporting the delete operation in our store.
Partial loads contains only data of some time period e.g. transactions from certain business day, current snapshot of balances etc.
Load impact sensitivity - despite of availability of full data for reconciliation there could appear performance limitations (massive read operations in the source, locks, read consistency and old snapshots ...). Sometimes availability of source data could be limited to small time window beyond business hours.
Data ageing - data could be ageing in the source in different pace than our data in ODS, so we have to consider that factor.
Internal factors:
Our SLA - the most important internal factor regarding to our customer experience and requirement. Real-time operational systems work usually 24x7 and despite there is a traffic distinction between working and idle hours sometimes, if we do not work in multi time-zoned environment where things are even harsher, we cannot simply plan regular down-time. This factor forces us to either make everything as reliable we would not need regular reconciliations (lovely fairy-tale) or to make reconciliation the way it could be done without downtime of consumer availability (partitioned methods) and the best even without downtime of real-time loads (smooth methods).
Data ageing again - Despite it should not be in operational data stores by theory, time after time it happens we cover some event or transnational data in our systems and we have to consider that factor too.
Dependencies - After reconciling information in any layer of ODS data we have to consider dependencies on all next layers. For example, when we reconcile our mirror of source table X, we have to go through all dependencies and reconcile all impacted tables too.
Reconciliation patterns - partitioned
Following group of reconciliation patterns uses possibility of database engines to exchange partitions. the source data are inserted into stage in the same format as the target table partition and the content is then exchanged. That approach is usually faster than primitive processes and brings less requirements to load window when data are not available for consumer.
Engines usually allow only exchange of partition vs. table so respectively we have to build PARTITION MIRROR (the table we will exchange at the end) as a single table if the target is partitioned or as a partitioned table if the target table is not partitioned.
PEP - Primitive Exchange Partition pattern
The Primitive Exchange Partition pattern is a counterpart of the Overwrite pattern in the world of partitioned approach. It is more powerful anyway as it combines advantage of fast load into empty structure with possibility of usage of previous data by consumers. All other disadvantages such as inability to support audit attributes, soft delete and to chain real-time processing behind stay in place.
In the PEP pattern we create exact empty structural copy of target partition, fill it from the source by the Overwrite pattern, synchronize indexes and then exchange it in no time with the target partition.
Difficulties could appear with global indexes so the best solution is to avoid global indexes on tables with PEP used or to use some of solutions described bellow.
Differently from the Overwrite pattern it allows to include testing step (answer to the "Was the reconciliation necessary?" question) by comparing the previous and exchanged content of tables. That way we could also create some workaround allowing to chain a real-time processing after the reconciliation process.
REP - Rich Exchange Partition pattern
The second issue with indexes could also be solved. As much I dislike global indexes on partitioned tables, I would say, "get rid of them, replace them with locals," but as much I can see point of them, I offer a solution during the process:
Create local index in the same structure as the global one, just with two differences:
It will be local.
The first column will be defined as desc to avoid technical limitation of "exactly" same structure. Some database engines dislike that.
Drop the global index.
Synchronize indexes for the PARTITION MIRROR.
Exchange the partition.
Recreate the global index.
Drop the redundant local index.
Reconciliation patterns - advanced
Advanced patterns just attach some additional information or logic in addition to the previous pattern to allow extend functionality.
One of good examples is:
Isolated Full Refresh pattern That pattern is similar to standard Full refresh pattern, but it can be used even in situation we have no full source set of data.
Chaining reconciliation processes
There are two ways how to chain dependent reconciliation:
Real-time chaining is the best real-time solution for non-initial and especially for Smooth reconciliation. In that case we rely on signals of changes in our tables and let the real-time brewing to finish the work on dependent tables.
Workflow chaining is the best solution for initial loads as we pre-load each table layer by layer by reconciliation workflow dependencies. That could be a bit tricky around loop-backs when table of the same layer is filled from data of another table in the same level not speaking about self-loops. These anomalies should be specially considered in these reconciliation workflows.
Rich Exchange Partition pattern is a complex solution making better performance with the same functionality as the Full refresh pattern. The major advantage is faster performance of huge differences or new structures without impact to the state of previous data while loading. The pattern can be used the best in approaches when we need all the functionality of Full refresh without jeopardizing target data while import of raw input data.
Technical issues and solutions
There are two hardships at partitioned patterns related to technology:
Partition exchange is possible only in the mode PARTITION - TABLE mode.
Partition exchange invalidates global indexes. We cannot afford invalid indexes in 24x7 real-time anyway.
The first issue can be solved the way mentioned above, just defining the PARTITION MIRROR either as single table - for partitioned target, or the inverse way as a partitioned table with one partition for non partitioned targets following way:
The pattern is based on assumption that if we have not full extent of data, we also know why we do not have it. That there exists some rule or measurable information we can use.
There could be various kind of limitation:
Business day - when data of transactions or events or time snaps are organized in daily partitions (either physically or logically), you can compare just subset of source data against subset of target data. In this case, the subset is defined by parameters of the process.
Dynamic timestamp check - when data are incremented by "last changed", the oldest relevant timestamp can be extracted from the source data. Lets call the timestamp T0. We do all Insert or update information based on incoming data and delete (mark as deleted) only these records where the relevant source timestamp is newer than the T0 one.
Log-wise increments
Log-wise pattern is a typical case of simulated real-time or message input in the Fake shake scenarios, but that could be also done by archived log of source data changes when it is available. It contains sequence of changes including information about the change operation and timestamp of the change so it usually contains the DELETE operation too. The major problem is there could be duplicates in data and the timestamp sequence should be considered.
There are following steps to the target:
Reduce duplicates by keeping always just the newest record of the instance. Only exception is extracting of timestamp from insert duplicates to save the Inserted timestamp.
Process [IU] operations like DIFF MERGE
Process [D] marking target records as deleted.
Smooth reconciliation
This chapter shows real viciousness of my nature. After long boring foreplay, clear well known information we got to the real topic. But there is no treasure without long way and some traps ...
The common task of Real.time Operational Data Store is reconciliation. And common availability of Real-time Operational Data Store is 24x7. So there is no space for reconciliations requiring downtime of any services.
That is the reason Smooth reconciliation not requiring any downtime exists and is almost obligatory part of real Real-time Operational Data Store.
The trick is that Smooth reconciliation does not resolve all data differences, but only these which appeared before time T0 i.e. time before the start of loading source data for current reconciliation.
Informative reconciliation
I have to admit we have slightly bent the word RECONCILIATION for the combination of getting information about differences and repairing these differences. There is always important to keep statistics and logs about numbers if differences solved by reconciliation as the requirement of data to be repaired could be consequence of some failure in the brewing process. We can log changed records based on timestamp, after reconciliation, we can store snapshots of DELTA TABLE etc.
We should also involve informative reconciliation process for the internal reconciliation so we can learn if we our transformation is well developed (what usually is when we generate the test reconciliation from the same metadata as the transformation itself), but more importantly the process of generating event signals and also problems with deployment of newest versions. It is reasonable to gather statistics of:
Numbers of differences in the structure MISSING - REDUNDANT - DIFFERENT
Number of differences per column of data
Details of differing records (or just a head() like sample)
Trust me, you will feel much more confident with a well designed reconciliation in your Real-time Operational Data Store.
Prerequisites - there are following prerequisites necessary for Smooth reconciliation:
Last Change Timestamp - every table in the chain of reconciliation should contain last change timestamp on the level of record. The best result we get when the timestamp contains information of change in the source system, but the "target" timestamp is also sufficient. The timestamp is important to prevent system of overwriting newer real-time information by the bulk reconciliation process.
Rigid soft detele - We have written about SOFT DELETE as a method where deleted records are not physically deleted, but they are marked as deleted by a special flag. Differently from the standard SOFT DELETE in RIGID SOFT DELETE we import even considerably deleted records from the source (instead of ignoring them) and we insert these record with the "deleted flag" set.
Transaction isolation - READ COMMITTED works well. For other cases you have to check feasibility as I am not sure about them now.
Process - you can smoothly reconcile your data by following steps:
[1] Conserve the T0 timestamp - before start of reading from the source you have to set the current T0 timestamp. In some architectures you have to decrease it for some small security delta. As bigger the delta is, the reconciliation loses efficiency at the most recent differences. However, you have to be sure you read the state of source data where all information with earlier timestamp are submitted.
[1a] Make a carbon copy of all source data into MIRROR TABLE in the RT ODS L0 layer. The Mirror table should be designed plain with no indexes so you can use fast OVERWRITE direct pattern to fill it.
[2] Calculate a differential DELTA DATASET registering all differences between the mirror table and the target table. Type of difference should be registered for each record of the dataset:
I .. Data are in the MIRROR but they are missing in the TARGET
U .. Data are both in the DELTA and in the TARGET, but they are different. That covers also the situation of "revive" when the record in the table is marked as DELETED.
D .. Data are missing in the MIRROD (only for full import or with additional subseting logic) and they are not marked as deleted in the target.
We also do an early filtering removing all these records where Target Last Change Timestamp is bigger than T0. All the step should be done set-based because we work with huge amount of data and row-based or bulk-based approach would slow down our solution.
[3] Apply the DELTA DATASET to repair TARGET TABLE. This operation is the most tricky and it is what allows to do the Smooth reconciliation to do both batch reconciliation and real-time integration at the same time. There are following success factors:
We work with not a big number of differences in DELTA. All the expensive operation of comparison of huge data has been done set based in the previous step so there should be just a fraction of differing records. Smooth reconciliation is not designed for initial loads! That way we can work in every difference record fine grain - row based.
We do it in small transactions avoiding possible locks what could slow down the standard real-time process.
We include a safe clause into each DML operation allowing to change only records where Last Change Timestamp is smaller than T0. That way we eliminate the risk of changing real-time integrated information by the older batch data. It is better to miss some recent difference than to make data even more inconsistent. The check is not redundant, despite we early-filtered differences in the previous step, it should be done again in time of the DML operation.