About Real-time Operational Data Store and Beer brewing
by Ludek Bob Jankovsky, 14-Jul-2019 (ETL PATTERNS)
Speaking about ODS nowadays we usually consider the real-time or nearly real-time one. That is what makes it operational by the way. There were concepts using that term for exaggerated staging areas of Data warehouses used for early access to D+1 data, in the past times of Business intelligence patterns, but that was just misusing of the term for something else or reducing the meaning to an operational reporting resource.
Later, following issues made these concepts wrong:
Using EDW ETL as a source or driving structure collided with the requirement of up-to date information.
Ad-hoc requirements for operational reporting messed with data quality and design quality requirements of flawless operational stores.
Data Lake appeared to be a better solution for operational reporting.
The requirement of high quality high available and real-time or nearly real-time data caused the new Operational data stores are slimmer but much better monitored and operated.
So, speaking about ODS we mean the real-time one, we speak about beer, not about lemonade, just to be clear.
There are many definitions what Operational data store really is, most of them are misleading as ODS in the EDW concept pays different role than current modern real-time operational data stores.
After designing few of them (the real-time ones, I don't speak about plenty of these EDW like ones I have met) I have learned that ODS is like a beer.
Why do I say so? Because of complicated way of brewing to get excellent taste?
Well, the most important association is the way of fermenting. There are two major ways of beer fermenting:
top-fermented beers - Ales
bottom-fermented beers - Lagers
And that way it is with Operational data stores.
There are either Ale-wise or Lager-wise ones.
Which are better? As people from the pub would say - depends what do you want to eat with. As for each food there is a proper kind of beer, for each solution there is a proper kind of the Operational data store. So first think, what do you want to eat and then chose what to drink with.
ALE-Wise Operational Data Store
Following image shows an example of Operational data store. Both feeding and receiving happens on the same logical level (usually ESB messages) so there is not necessary huge transformation logic on that "fermenting" way. There are still requirements of managing consistency of data (e.g. not to override newer information by an older one delayed). By the aspects of manageability, performance and development effort this way is the most efficient, while it fits to the food you are eating.
LAGER-Wise Operational Data Store
Following image shows an example of the Lager wise (bottom fermented) Operational data store. The receiving happens on the same logical level (usually ESB messages) as with the previous kind.
The feeding uses Capture Data Change (CDC) technology allowing to react on changes in the Data provider's databases (raw data) and based on them fill the Operational data store structures.
From the first sight it is obvious we have raw data of systems at the entrance and logical data in the output. So much of data source related logic should be replicated in the ODS what makes this kind of Operational data store more expensive for manageability and development.
Anyway, there are foods with which the Lager is better. All depends on possibility of data source to fill information from the "logical top", existence of Enterprise-wise processes allowing to catch event and fill it parallel into both ODS and systems of records etc.
That is why I like Ale more, but when Lager is considered as better solution, it can be done.
Common issues
It is bitter. Both ale and lager are bitter. If you do not want it bitter, try cider or drink a lemonade, but that would a bit childish.
Anyway, we want Operational data store, so we have to accept the bitterness.
Because:
There is no loading time slot, when you could dare to have the system eventually inconsistent.
There are no workflows allowing you to organize loading orders.
There are not only inconsistency issue, but you also cannot drain the resource because of possible slowdown of consumers.
There are .... well, it is bitter.
Service Level Agreement
Managing expectations while planing and designing Operational data store is maybe the most important and most difficult task of all.
You cannot drink a pint of ale in no time not spilling over your pecs. Despite you can do that pretty fast, there are always limitations.
But users want a system with no latency in the best case. After all it is a Real-time Operational data store. But there are costs and there are limitations.
You have to work with users and get to realistic needs and drive expectations based on technology limitations and costs of the solution.
First important thing is to differ between 2 levels of SLA, the fermenting and the drinking time. They are different.
To be back at our Operational data store, let's define following:
SLA level A - (fermenting) - is a latency of information on the way to ODS.
SLA level B - (drinking) - is an answer time to requests from users to ODS.
It is obvious these requirements are not the same. Returning to Data warehouses, they have Level A latency usually D+1, but they are supposed to return analytic answers in a range of second (Level B).
The same way, Operational data store is supposed to answer in milliseconds (Level B), but loading of data could be depending on technology matter of seconds.
Wrong expectations could cause wrong use-cases.
Following example shows Lager-wise Operational data store used for synchronous answer to request, when there is all the Back-end processing, CDC integration, ODS transformation logic .... Who wants to wait for reaction of application for more than 10 seconds nowadays (the realistic estimation after summarizing all these little pieces of time during the process).
Well, Lager is Lager and it takes its time. Fortunately there are Ales with much more acceptable latency for these cases (we will taste them later). Anyway, when you don't get into these synchronous use-cases, you usually do not worry that the disposable balance of your account is 15 seconds old when you check it. That way it is very important to set up expectations and to force users to specify realistic needs for each kind of data. After the system would grow, there will start scaling issues and it is always easier to starts on the weaker side than to reduce service level later.
That looks really simple and tasty, but lets look how the consistency control complicates everything.
You have to establish two phase processing. Every change should be first processed as unconfirmed and just after backend systems confirms it was successfully processed there, ODS confirms the information. In standard isolation level (READ COMMITED) just after that the information should be visible for ODS consumers.
Following picture shows an example of such collaboration of systems:
Well, the Ale finally
Ale-wise ODS is a compromise between Lager-wise and Stout-wise ones. As Stout is more Service oriented, works well with micro-services, Ale is more Event oriented. In the case of Ale, source systems initiate changes (send messages, fill topics) based on change events happening there. That way they can initiate Event just after it is committed there, so no fake changes get to ODS. As well they could be initially loaded and reconciled by simulation using the same infrastructure as standard real-time load.
There is more of development on the side of source systems, but there is still no coupling as all works on the base of fixed agreed interfaces.
Completeness
As well as we appreciate full taste of our beer, we except that our data in Operational data store are complete - valid. What means nothing has been lost in the loading process and internal transformations. It sounds standard, but thanks to extra-transaction consistency management it requires some bitter measures. There are many points of potential failure and particular message or data change can be lost and not processed. Differently from bulk data load it is not easy to check.
There are following bitter issues:
Initilal load - Even if you trust technologies and events that your Operational data store will stay complete and consistent maintained by event based load, you have to load it initially. That is easier with Lager based ones, when aside your CDC you have always your source table, but with Ales that is usually more bitter. There are various ways, you can load initially from different source that events come, you can generate artificial events causing all the structures filling etc. Anyway, that is something we have to count on designing the system.
Data - At the design phase you have to assure all data sources necessary are covered and loaded. Especially at sparsely filled entities and attributes they could get omitted and not simply tested.
Events - At following phases you have to assure no Events are getting lost in the process.
Speaking about Initilal load, there is always possibility to generate surrogate Events for all source records to provide initial load - by simulation. It could be sometimes expensive for resources, but it uses the same infrastructure as the standard load and is not always totally bad idea.
Consistency
Consistency of ODS information is one of major issues and performance indicators of our solution. No one wants to drink stale beer, do they? Inconsistency can lead to two types of failure:
We publish obsolete data - that is related usually to latency of system. As Operational data stores are integrated asynchronously, our data will always be "slightly obsolete", but we can avoid business impediments in that case. Worse cases can happen when we lose some signals and we do not react on some changes of source data.
We publish incorrect data - that is what they usually call Eventual inconsistency.We refresh part of information and still we do not refresh the remaining part. If these changes are dependent, we can get temporarily incorrect information. ODS loads should as much as possible follow source system transnational logic . Another case is Read uncommitted situation, when we react on an event happening, but not to the rollback of the change in the source.
That are just example of possible inconsistencies, some are necessary and some we have to avoid. Nothing is perfect anyway and we have to balance price of the solution to valid requirements (see the expectation management mentioned above).
We have to consider the ODS load is an integration task working with distributed systems, distributed transaction, unordered messages etc. That way we have include Extra-transactional consistency mechanisms, such as Saga, Dirty load (last takes all) etc.
Reconciliation
Do you believe your ODS will stay consistent based on Event based (real-time) loads only? I think the system is to be designed you can trust it.
But there are several factors what can cause you to spill your beer:
Initial state - we spoke about in the chapter of Initial load. You roll-out your system and there are still no events signaling the initial changes.
Downtime - it can happen, no system has 100% availability and here we have several systems including middleware infrastructure etc.
Service data modifications - some errors in source data can be repaired non-standard and that way events could not be registered.
Transformation bugs - we live in an agile time, there are no changeless perfectly tuned components, our ODS and its loads are evolving. Especially after roll-outs of new functionality glitches can appear. These are to be detected and solved.
Not tasted flavors - all these transformations and event processing makes very complex system which can be never tested completely. You can never simulate all possible sequences and causalities to be sure something new won't happen. That way reconciliation serves as a monitoring component too.
There are following tasks we solve by reconciliation:
Quality assurance - In the complex event-based transformation processing there are several points of failure causing inconsistency. Reconciliation is one of end to end type possibilities how to monitor the system and alert operators in the case of glitches (failing processes etc.)
Repair of data - Another task in front of reconciliation process is repairing inconsistent data.
Part of standard process - Not all real-time transformations can be issued by events. Sometimes (the best would be never) we cannot identify deleting of records and only way could be temporary inconsistency with regular reconciliation. As well that can be involved as a workaround in the process of bugfixing.
In that case no two phase confirmation is needed. Operational Data Store gets confirmed information (by source of records) only. There is one draw back - Backend system has to care after sending an event information about change for ODS. Lets look how these swim-lines look in the similar example as the one at STOUT (so the case an event is issued externally by frontend or another source, what is not always necessary with ALE-WISE integration):
Shall we mix?
It all depends on scope and size of your solution. Each mentioned integration type requires some effort. Concentration on one of them will simplify the final solution and reduce time to life. Making hybrid solution will allow you to optimize ways of integration based on requirements towards latency and SLA and possibilities on the side of source systems. Honestly, the Lager brewing is very complex thing, so extending it by the Ale or Stout-wise capabilities should not be as hard. By the other hand, mixing the CDC into Service oriented infrastructure would be just last a desperate path, last instance. Ales are not as bad when you get used to them.
Considering there is middleware consuming data from ODS, it could be reasonable develop support for top-fermented brewing too. That would reduce latency at critical tasks and extend usability of the solution.Especially at STOUT-WISE ODS you have to solve initial loads and reconciliation often based on RAW data so existing bottom-fermenting infrastructure could be helpful.
By the other hand, if you do not need CDC integration, if you can avoid coupling of systems, if you do not have legacy systems with limited possibility of ESB integration, stay with Ale or Stout.
Early filtering
Real-time processing is very expensive in comparison to bulk loads. Aside transformation itself you have to manage events of changes, process all the impact of every change, assure consistency and all that in almost no time. Well, you can avoid all the workflow-like load process management, but it does not help much.
There are things helping to reduce costs (resource consumption) which can be generally named Early pruning, or Early filtering.
It appears in many places during the ODS load or reconciliation process.
Accept only "real" events caused by real changes at the source (in the scope of data from the source you use for the ODS load).
Filter events changing only data not appearing in your transformation.
De-duplicate redundant events as soon as possible.
Eliminate "no change" data manipulation.
Reduce transformation data reads to necessary record (row based approach).
That way you start with big number of events but in the process you reduce the effort. That is very useful especially at recursive processing when some changes of certain entity could cause (or at least signal event for) change for the same entity.
Early filtering is one of most important means of success when building ODS, real-time loads, recursive transformations etc. It could look simple, but it pays of. You cannot brew tasty beer without filtering, do not trust "unfiltered" ones.
Smooth reconciliation
Reconciliation normally needs downtime of the Operational data store, or at least of standard (event based, real-time) loads. That is impossible in working hours, so after some of reconciliation need reasons mentioned above it is usually necessary to wait till the end of working hours with slightly inconsistent data. Sometimes working hours never end up, especially at global solutions. And as I mentioned above, we need to reconcile time after time. If there was a way of reconciliation working smoothly without downtime of standard processes ....
Well, there is a way. Smooth reconciliation. I mention it intentionally after the Early filtering chapter, because the filtering is a necessary prerequisite of the way of reconciliation. By the way, unfiltered beer is all but smooth.
Another prerequisite of smooth reconciliation is an existence of exact and reliable audit information about updates (real updates, not these no-change fake updates) on the row basis.
Then we can start following process:
Mark timestamp of the start of reconciliation. Lets call it T0 further. That is necessary, because we need after we start, data could be changed and real-time loads could change these in ODS, so that are data which difference we will have to ignore later.
Load exact copies of ODS stage tables (further Mirror tables) based on current source data. We cannot load them into standard stage, because data of standard stage are influenced by real-time loads.
Compare Mirror tables with stage ones. We can compare and ignore all records without difference. We stick on record what are different. Based on Early filtering principle we can also ignore record with difference when the record change (based on the record update timestamp) happened after the T0. It could be also useful to register hypothetical operation - the kind of difference (I.. exists in Mirror and does not in Stage, U.. exists in both, D .. exists in Stage but not in Mirror). The comparison has to be done SET-BASED (one statement) because of performance. As a success factor we expect that the reduction of data during the comparison will be really significant and the number of different (suspicious) records will be really small.
Repair Stage and generate events. This part is really tricky. We have to consider the Stage records can be changed during the process by real-time process. All the smooth reconciliation can be slow and we have to make sure this step won't slow down the standars real-time processing. There is a way. We have to do it ROW-BASED in following steps:
Lock certain record with difference identified by previous step.
Compare the record again and check the update timestamp. if the update timestamp is newer than T0 or there is no more difference, ignore it. Otherwise update the Stage record based on Mirror one and generate Event.
Unlock the record.
The final comparison is necessary to make sure record has not been changed after previous SET-BASED comparison.
Another prerequisite of the Smooth reconciliation is internal Event oriented architecture. Transformations of following layers should be triggered by changes in previous layers.
Tasting Lager
The bottom brewing is one of the most complicated way of ODS integration. You integrate on the bottom level of data logic, on raw data of tables and columns. That way you have to understand the raw data in order to be able to design ODS transformation. Another disadvantage is excessive coupling of systems, what is not as bad at legacy systems with minimum of changes, but in the case of new dynamically growing systems it could be really painful.
The most important prerequisites are:
Metadata driven solution - you really have to pay attention to manageability of the system because of the excessive coupling.
Impact analysis / Change warning process - process assuring that you will be forwardly informed about any planed changes in data structure and/or its usage in all relevant (coupled) source systems.
The most often motivation for brewing Lager is reduction of impact to source systems (just technical level on database side with CDC technology and theoretical help with redundant business logic implementation on the ODS side, but there is not a huge development there). Anyway, just few solutions has been done fulfilling following criteria:
low latency
complex transformations
There are many CDC based solutions with just simple transformation logic (either simple data or the transformation is postponed to the consuming process of readers) but can these be really called ODS? Is that still a beer?
Recursive events
Most transformations could be represented as queries with result-set in the format of target table. Transformations in ODS are used to transform data of lower levels into data of higher levels. So data from stage (further L0) are source of transformation into normalized layer (further L1), date from L1 layer are source of denormalized layer (further L2) etc.
Unfortunately it does not always work. For various reasons tables of the target layer are used as a source too. So we can say that data of lower or the same layer are used as source for certain layer transformation. Using data of the same layer as a source can lead to recursive process dependencies.
In that case I jut want to remind EARLY FILTERING what helps you to avoid cycling of the system when it is done reasonably.
Elasticity
You have be ready for various types loads from source systems. There could be small in most of times, but they could also grow into amounts of bulk loads time after time. These bulks can be either regular (Close of Business, Close of Day) or irregular (e.g. repairs of data). At the very start of my brewing I thought load could be switched from real-time to bulk and then back by some scheduler so we could count on ROW BASED processing only. The problem is that even regular processes are not totally regular, so we would have to implement some mechanism of signals from the source system. We also met irregular or partly regular bulks, providers of source systems were not aware of or that appeared during working hours. That taught us our solution should be able to cope with both small and large bulks during the standard process.
In my opinion (I emphasize "my opinion" because there are still colleges who do not believe me that) ROW BASED fermentation is a myth. It is totally not elastic and cannot cope with larger bulks of data. And will you never avoid large bulks of data neither in regular nor in accidental processing.
To support Elasticity you have to:
Avoid ROW BASED and process MICRO-BULKS
Make an execution ELASTIC not stuck on certain plan - one transformation should work in both SMALL and LARGE modes. Shared pool usually forces the execution towards once associated plan during last hard parse.
Then you fight against two evils: wrong plans and too many hard parses. Considering the standard processing is small you have to avoid hard parses between small runs and do it only when switching from SMALL to LARGE, and LARGE to SMALL. All SMALL runs can go based on one plan so no hard parse is necessary. There are more possibilities how to manage LARGE and SMALL:
Two statements for each transformation could be generated and hinted differently, one for SMALL and one for LARGE processing.
Statistics on the Event table could be set based on amount of data.
Two event tables(or partitions) and two statements, that let you avoid all hard parses as statistics are set initially. The distributing process decides where to insert copies of Events based on their number.
Define the SMALL LARGE parameter with default value (e.g. 50 000) to be able to decide which plan should be used. It is reasonable to allow modification of the default for particular transformations to tune the solution.
Define the MAX BULK SIZE parameter with default value (e.g. 1 000 000) to be able to manage maximal size of loads and better manage to delete processed Events.
Lager bottom fermenting
Design prerequisites:
Persistent stage layer - to perform transformations you need not only records where changes happened but also remaining joined information to perform the transformation.
Event machine - after any change, despite they happened in source system and are received by CDC or internal changes, you have to generate Event of change. Based on these events necessary transformations should be triggered and performed. Event information should contain full data change record - e.g. set of pairs of OLD and NEW values.
Soft delete - there should not be data physically (hard) deleted in the system, but they should be temporarily held in the soft deleted (deleted flag or deleted timestamp) for some time to react properly to events. If a record is deleted in the source system, you still need necessary information to identify the record in ODS, so you should not delete lookup information physically, at least not immediately.
Audit columns - especially timestamp of last update on the
record basis.
Identification and fine grain transformation
Identification is a process of identifying target records which could be impacted by the source change. The most realistic way is to bend query, generated for transformation and substitute the source table of interest by the event information and prune analytically all not necessary joins and columns from the query. That way you will get the target identifiers. In the fain grain transformation phase they can be joined to transformation to load proper data. You only have to do that for every source and as well for both OLD and NEW value set. After the event has been used by all, you can delete it, just do not forget to stay consistent no matter if within or out of transactions.
That is all, I hope it is all clear.
It is not?
Damn, I said brewing Lager is a complicated process.
Well, lets look at that step by step:
Event information - Capture change event information. It should contain pair of records, values before the change and value after the change. It also have to contain timestamp of the change and optionally type of the change operation. There are several possible sources e.g CDC, transformation procedures, triggers on tables.
Multiple consumers - You also have to consider the information will be used by several consumers - one table can be used in several transformations a even in one transformation it could be used more times. Just imagine transformation as a SQL query, one table could be used more times there. It could be solved by some queue subscription mechanisms but in my experience with brewing existing queue solutions are internally too complex as they are designed for other volumes and functions than the simple event distribution. That way they can often stuck based on inner locks when you start to use them really extensively. One of possible solutions is to make several copies of the event, one for each consumer based on distribution matrix. And as we spoke about Early filtering, here is a place to use it too. It can be also reasonable twine the event record here - make it two old and new versions of record.
Identification - is the process allowing us identify target records possibly impacted by certain changes. It would be severe overhead to design an identification process manually. The optimal way is to derive identification query from the transformation metadata. It could be done in following steps for each source (i.e. alias in the query):
take the query of transformation
replace the source (alias) by event table by event for both old and new version of record. If these was twined during multiplication, it will be easier.
align join types. If our source (the one we replace by event) is joined by outer join, we replace it by inner join. Some other joins can be changed based on query structure analysis, but it can be very tricky
change conditions (both joining and filtering i.e. ON and WHERE clauses) to include deleted (soft deleted) records
remove unnecessary joins - again it is easier to say than to analyse in and do it
reduce filtering conditions of anti-join type
reduce unnecessary columns, we need the target identifier only (in my brewing the reducing of columns and reducing of join was an iterative process)
Fine grain transformation - the best way is to use one SQL statement each source separately what allows us to manage execution plans and transaction consistency. It could be done in following steps:
make a statement with two sub-queries, one - the identification query from the previous step (further ID) and second - the transformation query (further TR)
join these sub-queries the way: ID left join TR
merge the query into target table following way ...
...update records what are in both TR and ID
...mark record what are in ID and are not in TRN as deleted
Cleansing used Events - is sounds easier than it is. Consider that while you process current events in current step, new events can appear. They can appear from various in various transactions s ordered sequence or usage of timestamp to decide what has been processed is impossible. There could still be older records I do not see (READ COMMITED aspect) at the start of my target transformation. So the task is: Delete only these events you have processed. One of expensive ways is to mark available events (flag), process only marked ones and then delete these marked ones, but that makes some unnecessary overhead. Another possibility is to get used events into memory, process them and delete them after (forall), what is tricky again, but efficient.
What the hell does the Stout here?
At the very start we spoke about Lagers and Ales. But there is another kind of top-fermented beer.
Stout is a bit sweeter and smooth with smooth rich foam. (When I came to my pup and the bartender wanted to bring usual Red ale for me, I said "it will be Guinness today," he was surprised. I did not want to explain it is because of this article). So is the STOUT-WISE ODS - sweet. The low latency is marvelous. But the foam on top ...
The integration of ODS this way is the fastest of all. In that process information gets into ODS at the same time as it gets into back-end systems. There is more worries about integrity (Saga or similar extra-transaction integrity process should be involved). There are also worries about initial load, by simulation it is impossible so if it is used for more persistent data, you have to design and develop side-wise initial load. The same for reconciliation processes.
Anyway, we can say that that way we can brew really real-time ODS. Not only "nearly" real-time as say people disappointed by latency of lager-wise bottom fermented ODS (good beer needs time). Stout tastes well with microservices and generally in SOA architecture. It brings little additional costs to source systems, just with the initial loads and reconciliations, but that is less than all the event control on the side of source system we should solve in the case ALE-WISE ones.
Consistency control
There are various levels of data consistency control.
Transactions - are the simplest and reliable ways to keep process consistent. They envelop steps into group which is either performed completely, or not. No half-done or uncertain states.
Distributed transactions - are an extension solved in distributed systems covering collaboration of more local transaction systems.
Extra-transaction consistency - is solved by two-step confirmation. Changes of data are prepared, but real actions happens after a final confirmation comes. That is usually solved not on the lowest level of database engines but in the outer logic of developed solution framework. It can anyway use embedded mechanisms of database engines such as Flashback.
In the ODS integration we work in heterogeneous environment so the third option necessity is obvious.
Following picture shows the principle of the STOUT-WISE integration: