Well filtered lager, Lager-wise Real time Operational data store
by Ludek Bob Jankovsky, 26-Sep-2019 (ETL PATTERNS)
After the last article about beer and operational data stores and the audacity it got (understand some people have read it and few people even liked it) I got encouraged to write a sequel.
Some readers were slightly disappointed why do I mess some boring Data issues into relatively cute article about beer, but most people appreciated it.
We will focus on Lager-wise Real-time Operational data store now. As I have written Lager-wise ODS are much more complicated and the latency is not as real-time as expected. Ale drinkers sometimes wonder what is so complicated with lager and why my glass of lager is still half full when their is still half empty. Despite Lager-wise Operational data stores has such disadvantages and it is in general not easy to brew good lager, I feel that type will soon prevail, especially in the Business intelligence world where we tend not to bother legacy and source layer application owners by extensive integration tasks and the agile approach allows to do that efficiently. I am partially guilty. The Metadata-driven approach and ETL patterns helps that - from the point of view of ale drinkers - silly - approach feasible and even efficient. Silly things happen sometimes. Some many times.
So it is about Lager now!
As I said, the article is a sequel of the previous one and it could be reasonable to read that one first to get into picture (to understand difference between lager and ale).
Brewing the lager, you have to be patient and humble. The brewing is an organic process and you cannot supervise every little detail of it. There is always something you will feel that it is redundant, not exact, or even "dirty".
Brewing of lager is about an ambivalent feeling of "dirty" redundancy and uncertainty and about early filtering to make the result clean. And understanding and accepting such ambivalence is the main success factor of all the process. Whenever I or some of me colleagues felt something could be done more simply way it led to slippery to a failure.
Lager is not simple, it needs its time and patience of the brewer.
Gathering hops - first we have to collect signals - events
All the bottom brewing is about gathering signals and driving processes based on them. There are no workflows starting tasks, no subscriptions, no pushes. There is only CDC signaling that something changed in the source and giving us an information what has changed. These events (signals, hops) are basis of the real-time Lager-wise processing. And because we work on the bottom level of information - on raw data - these hops will come in elementary granularity enveloped by low level transaction frames of source database.
Following picture shows the process from the information origination in the source toward the low level data and back to Operational data store.
So first we have to gather hops. That could be done several ways, CDC and some tool allowing to integrate CDC such as Oracle GoldenGate, Informatica Powerexchange etc. are the most common. That could even be done based on triggers in homogenous environment or in the case your database engines does not support CDC. All these methods and theirs implementation has been described hundreds times and are presented by the tool providers like all you need to know and all the rest is just easy.
However, it always ends up you have hops. Elementary changes separated from the plant stem aggregated only by transactions in the best case. I said it in the last article, we do not speak about mirror copy of source systems where you can just materialize these hops into tables and use them however you want. But that is not a lager, that is a lemonade. We speak about operational data store so we consider some level of transformation of these raw data into consolidated state to be used by real-time services with strict SLA. We cannot want these services to access shattered data and consolidate them on demand. We speak about cases when we can live with 20 seconds old data, but we should have them returned in milliseconds.
And now there is a time for the most important question of mankind:
What shall we do?
We have to get logical information from all these low level changes received from the CDC process. The question is more about how to do that. Lets call the process fermenting. And fermenting needs time, especially in the designing process. It is an organic process so we have to forget about some ideas from synthetic thinking:
Forget about:
Reverse process construction - there is no analytical apparatus able to revert business logic of source transformation automatically with all details. It could be done for particular cases manually, but it would be very expensive even in small solution and twice more solving changes in the business logic of source systems. These things should be solved in numeric.
Considering exact order of changes - there is neither time nor energy for that.
Aggregating signal by source logical event - you have to admit to yourself that these hops are separated from the stem now so each hop can cause separate event despite it will lead to a single transformation in the end.
Eventual consistency - you can hardly keep track of source changes so it can happen that a transformation starts while not all events came and make a particular changes and after remaining change data it will be triggered again and the remaining part of changes will be considered. Especially at extra-transaction processes it can happen.
So the solution is slightly "dirty". Organically dirty. We have to live with that.
Use these adapted hops for Identification -
That is really uneasy task. We know how the source information changed, but we have to realize what records of the target table could be influenced by the table - to compute keys. The change could touch particular expressions. That would be easy. But the change can also touch result of filtering, of join condition, de-duplication, aggregation and that all recursively. Forming that in the analytical world would be very hard. It is even almost impossible to analyze what action will happen at the target after certain action in the source. In the case of anti-join effect insertion in the source can cause deleting in the target, changes in columns used in join conditions can cause deleting of some records and inserting of others, or just updating.
I recommend be just humble enough to start from the transformation mapping and solve all the task in "numeric" well, almost all in numeric, there is an option of "early pruning" what is a metadata variant of early filtering and that could be done - humbly and carefully - analytically on the metadata level.
Following part is pretty hard to understand so I will add an example of a simple transformation (in the form of SQL query) to explain particular steps. The example will not contain too complex things like cascading queries, de-duplication or aggregation. Just as much to understand the basic principles and not to get totally confused at the same time. If it wont be clear anyway, after few pints of lager it will be.
So lets look at the raw query describing the transformation:
select
a.ID as TARGET_ID --pk of the target table
,b.NM||'.'||a.NM|| as TARGET_NAME
,a.DS as TARGET_DESC
,trim(a.SOMETHING) as TARGET_VALUE_A
,b.X+nvl(c2.X,0) as TARGET_VALUE_B
,a.X+nvl(c1.X,0) as TARGET_VALUE_C
,b.SOMETHING as TARGET_VALUE_D
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID
left join SOURCE_C c1 on c1.OBJ_ID = a.ID
left join SOURCE_C c2 on c2.OBJ_ID = b.ID
where b.STATUS != 'DUMMY'
As we said, we need to use the "soft delete" approach in the solution so as the query above describes logical form of the transformation, the real query describing the transformation including the soft deleted logic would look following way:
Processing hops - L0 layer and what do we need it for
If you ever tried to reconstruct information based on partial changes, you can understand you have to keep a persistent state information about these objects. Changes never happens to all these parts necessarily so you have to combine changing information with the last known state of the unchanged information. In topics world they call that compaction, you keep last state of every object.
So, the first task for you is to update your persistent mirrored L0 layer in almost the same structure as the source based on change records. That is still easy and most CDC solutions end up at the task and salesmen offering technologies use it to present how easy all the implementation will be. Piece of cake. But not the lager.
There are following issues forward:
You need "soft delete" - you have to keep deleted records in the game, just marked as deleted. We will discuss the reason later in the Identification phase so I wont go too deep now, but that is the minimum of history you have to keep to make the fermentation work, trust me. I have heard so many time that "It would be so easy if you did not need the "soft delete." so trust me I have reconsidered it at least as many times. You need it!
You have to be transaction safe at this phase - however we speak about "dirty" approach in next phases when the order of changes is not assured but it is assured that at the end the re will be the so expected "happy ending" and data in the destination will be consistent, the L0 layer synchronization and generating of "identification events" should be transaction safe and ordered. So no older change should override newer one updating L0 layer and no identification event should go off before the L0 table is updated based on that hop. This is the last place in our fermentation process where order matters.
Identification - is a part of event driven process where we have to update tables at higher layers maintained as a result of transformed data of lower layers. And to do that fine grain - we cannot update all the table whenever something changes - we have to identify all records of target tables what could be hypothetically influenced by the source change. The identification is one major sub-processes of the fermenting and I will describe it in detail bellow.
Early filtering L0 - this is a place of first layer of early filtering. The CDC process pushes to you "changes" which should not be always changes for you. That happens when some of source system processes does "idle updates" just not checking if something really changed (thanks heavens these cases are rare) or if you take just subset of columns of wide table and changes appeared in columns you do not take.
Example:
We used a source table of Users to translate internal key of user to readable code. We needed just these two columns, the key and the code. But in the source table was a column we did not consider and it contained timestamp of last login of the user. So whenever any user logged in, the record changed and we hot the information through CDC despite nothing visually changed on our side. It was relatively small table so no stress. But the information of user has been in many records of other tables. And every login of the user led to identification and attempt to update of all these records consequentially like a butterfly effect. That led us to involving early filtering always at the first possible place of the fermentation process.
Following picture shows the L0 layer processing:
select
a.ID as TARGET_ID --pk of the target table
,b.NM||'.'||a.NM|| as TARGET_NAME
,a.DS as TARGET_DESC
,trim(a.SOMETHING) as TARGET_VALUE_A
,b.X+nvl(c2.X,0) as TARGET_VALUE_B
,a.X+nvl(c1.X,0) as TARGET_VALUE_C
,b.SOMETHING as TARGET_VALUE_D
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID and b.DELETED_FLAG != 'Y'
left join SOURCE_C c1 on c1.OBJ_ID = a.ID and c1.DELETED_FLAG != 'Y'
left join SOURCE_C c2 on c2.OBJ_ID = b.ID and c2.DELETED_FLAG != 'Y'
where b.STATUS != 'DUMMY'
and a.DELETED_FLAG != 'Y'
Lets call the transformation T1.
For the transformation T1 we will need hops coming from these 2 source tables (SOURCE_A, SOURCE_B, SOURCE C) but the source C appears in the transformation twice so there will be four four sub-queues designed for the T1 transformation, 4 different identification statements as we said we sill solve it in numeric. Lets start with the source table SOURCE_A.
Lets say the table has following columns:
ID
PARENT_ID
NM
DS
SOMETHING
X
Y
Z
STATUS
And of course the technical
DELETED_FLAG
Step 1 - Substitute the certain source by the hop
For the identification processes we use the query without the "deleted flag" limitations. We have to consider both valid and deleted records of these sources for that to identify both previous and new state. In the case the change results in deleting, we have to know what to delete. The soft delete helps us to work with the records changed by deleting their sources accordingly. Without soft delete we cannot perform the delete operation on target. There is no lager without some bitterness. So, we will use the queue HOP_T1_a (hops of the SOURCE_A for the transformation T1 - alias a) instead of the SOURCE_A table. For the example imagine the queue as a table, but in the real fermentation process we cannot use it that simply because the queue has its dynamics in real-time we have to solve along. As I have described in the previous step of multiplication, only differences of ID, NM, DS, SOMETHING, X and DELETED_FLAG will be considered as significant to fill records into this particular queue HOP_T1_a. Of course, you wont do that manually. Just test your metadata granularity and check if you can generate that. For most things described here no AI is necessary, and you can trust me, you have not enough time to do that manually.
from SOURCE_A awill be replaced by from HOP_T1_a a
select
a.ID as TARGET_ID --pk of the target table
from HOP_T1_a a
join SOURCE_B b on b.ID = a.PARENT_ID
left join SOURCE_C c1 on c1.OBJ_ID = a.ID
left join SOURCE_C c2 on c2.OBJ_ID = b.ID
where b.STATUS != 'DUMMY'
select
a.ID as TARGET_ID --pk of the target table
from HOP_T1_a a
join SOURCE_B b on b.ID = a.PARENT_ID
where b.STATUS != 'DUMMY'
left join SOURCE_C c2will be replaced by leftjoin HOP_T1_c2 c2
select
a.ID as TARGET_ID --pk of the target table
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID
left join SOURCE_C c1 on c1.OBJ_ID = a.ID
left join HOP_T1_c2 c2 on c2.OBJ_ID = b.ID
where b.STATUS != 'DUMMY'
Identification
At the moment we are still in the early phase of fermenting. After we used the fresh hops co create the bottom layer(L0) we have to adapt them to be used for the identification process. Simply, you have to compute all target keys of records which could be influenced by the source change. Each such adapted hop can be used several times as there can be several transformations the source table appears in. The task can be imagined as a multi-consumer queue of hops.
Although, whenever we tried to use a standard queue solution in that process, it appeared it sucks when the amount of queues exceeds some critical amount. It was very tricky, all worked great at the Proof of concept phase and sucked when used in real traffic hitting unspoken limits of solutions.
I recommend to concentrate just to the functionality required and to make your own light queue solution for that.
So, how to make the identification process? The one I will describe is just one of many, but it illustrates the process issues and possible steps:
Be sure the L0 record is updated - you can easily cover the hop adaptation into the same transaction as the L0 update, but be careful not to slow down the L0 updating process too much, especially at tables used in many transformations. The best, write these used hops into initial queue within the same transaction, but make the multiplication and adaptation process asynchronously.
Multiplication and adaptation - is the process what multiplies our hops into separate sub-queues, one for each transformation (and if the table appears twice as a source in one transformation so two). That will make easier to manage the queue as a multi-consumer one because we convert the task to more single-consumer queues. We can also use the mapping information to select significant columns for certain mappings and do the multiplication early filtering reducing unchanged records (from the point of view of certain transformation). In the process of multiplication we can also cut these hops into two symmetrical parts, the old and the new state. For the identification you should not care if the information represents the previous or the current state, you have to process both the same way.
In the process of early filtering (which you should do while both halves, the new and the old, are still together) you eliminate these records what does not differ in the part used in a particular transformation as well as empty halves (i.e. the "old" half of insert or the "new" half of delete).
for the multiplication and for the early filtering you have to use extract of metadata. That requires formalized metadata and that is the first reason we need comprehensive metadata driven solution for that. We have to be able to generate two matrices:
Source table x Transformation matrix - that is easy even in not much fain grain formalized metadata. That are just source tables of mappings.
Source column x Transformation matrix - this is a bit tricky, because you have to consider all columns used either in transformation expressions, or in joining conditions or in filtering, aggregating or de-duplication. Not many metadata systems have such detail granularity to make it safe.
Following picture shows the multiplication and adaptation process:
You can notice we kept just necessary primary key of the table in the result set.
Step 2 - Reduce unnecessary parts of query (Early pruning)
There are still few things opened:
It is dangerous to keep the WHERE condition in the query - there could be some elements influencing the target result set. For example anti-join. The safest think is to remove the WHERE condition too, you can also try to evaluate the condition and check for characteristic sign of anti-conditions e.g. NOT IN etc. In any case, it is good to let you to impact final decision by some kind of hint / pattern directive towards one or the other decision. In this case, lets say, the generator decided the WHERE condition is not dangerous so it keeps it there.
Early pruning - lets prune the query to get rid of unnecessary parts. The process is no trivial and I wont describe it in detail here. Either you can imagine it or you would not understand it anyway. (Maybe in some further article I will get intto depth ;)
c1 will be removed. It is left join not influencing he a table, the table attributes are not used neither in a key nor in the where condition nor in any remaining join condition. We wont mess neither de-duplication nor aggregation nor analytical function nor cascading queries, it would be too complicated to explain just now.
c2 will be removed for the same reason.
b wont be removed, it is inner join so it can have filtering ability.
But there is much more in this case. Not only we can remove the c1 line because it is a left join without causalities.
We also an change the LEFT JOIN to JOIN in this case, because we will work only with records impacted by the queue so the join can get its filtering role.
select
a.ID as TARGET_ID --pk of the target table
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID
join HOP_T1_c2 c2 on c2.OBJ_ID = b.ID
where b.STATUS != 'DUMMY'
Step 3 - Mix it together
Now there is time to mix all these identifiers for certain transformation together and reduce multiplicities. Then the set of identifiers can be used to run fine grain transformation.
from [ID queue]
left join [transformation]
Short explanation:
We use the ID queue and left outer join the transformation. That way we get all identified records (all identifiers for delete candidates) and the DELETE_FLAG indicating (non)presence in the transformation.
As it has been shown on the picture above, we will do the same for the SOURCE_B and for the SOURCE_C twice (c1 and c2).
I wont describe it here for each source, just for the c2. It is the last one and along with the c1 one it is slightly different. because of the left join and relative isolation (what makes things easier, not get mistaken by that. Outer joins can be much more complicated impacting keys, used as anti-joins, necessary for further joins etc.)
So lets say, the SOURCE_C table has following columns:
OBJ_ID
STATUS
X
DELTED_FLAG
As I have described in the previous step of multiplication, only differences of OBJ_ID, X and DELETED_FLAG will be considered as significant to fill records into this particular queue HOP_T1_c2
Final touch
And now, we are almost there. We have all these identifiers so we know how which record should be updated. The fact these identifiers appeared in the ID queue means these records are candidates for change. It does not say what change it should be whether it should be insert, update or delete. We have got used for MERGE statements allowing us not to care about whether we should insert or update. So that is easy.
What is not easy is the DELETE operation.
Anything could change and it depends only on the transformation what operation it should cause if any. And here is a the secret. From the very start of the article I hold back the reason of the soft delete. From the very start we do the process of identification including both the old and new states, including both valid and deleted records.
But what could be reason the transformation does not return records what are identified?
All records of the target table identified by the ID queue and not returned by the transformation query have to be deleted!
The picture above shows following sets of interests (we do not car here about anything beyond our ID queue).
INSERT - Identified records in the transformation which are not in the target table should be inserted.
UPDATE - Identified records in the transformation which are both in the transformation and the target table should be updated. Of course we update them only in the case they differ (last stage of early filtering), but I think I should not remind it more. There could be a situation of "soft deleted" records. we update them into "undeleted" state (operation REVIVE).
DELETE - Identified records which are in the target table but not in the transformation should be deleted (soft deleted). The reason is, something happened to the source and these records are no more returned by the transformation.
IGNORE? - Identified records what are neither in the transformation nor in the target table could be either ignored, or inserted as soft deleted. Both is possible but the fact that these records exist means they would be deleted before, but the reason of them disappeared before their initial load. That is usually a marginal set and the time of initial load has usually no business meaning so it should not impact the state of our data. That is the reason I recommend to insert records of this set as "soft deleted" records. It will pay off during further integration.
Following sample pattern will finish our example till the final taste. A bit rough, it should be refined, but for the big picture of the lager brewing process the simplification was necessary.
In the core of following process will be join of our "ID queue" ID_T1 and the transformation T1.
with T1 as(
select
a.ID as TARGET_ID --pk of the target table
,b.NM||'.'||a.NM|| as TARGET_NAME
,a.DS as TARGET_DESC
,trim(a.SOMETHING) as TARGET_VALUE_A
,b.X+nvl(c2.X,0) as TARGET_VALUE_B
,a.X+nvl(c1.X,0) as TARGET_VALUE_C
,b.SOMETHING as TARGET_VALUE_D
from SOURCE_A a
join SOURCE_B b on b.ID = a.PARENT_ID and b.DELETED_FLAG != 'Y'
left join SOURCE_C c1 on c1.OBJ_ID = a.ID and c1.DELETED_FLAG != 'Y'
left join SOURCE_C c2 on c2.OBJ_ID = b.ID and c2.DELETED_FLAG != 'Y'
where b.STATUS != 'DUMMY'
and a.DELETED_FLAG != 'Y'
)
select
nvl(ID_T1.TARGET_ID,T1.TARGET_ID) as TARGET_ID --that way we get ID despite of subset
,T1.TARGET_NAME
,T1.TARGET_DESC
,T1.TARGET_VALUE_A
,T1.TARGET_VALUE_B
,T1.TARGET_VALUE_C
,T1.TARGET_VALUE_D
,case when T1.TARGET_ID is null then 'Y' else 'N' end as DELETED_FLAG
from ID_T1 --surprise, we start with the identification because it is leading in that case
left join T1 on T1.TARGET_ID = ID_T1.TARGET_ID -- and of course left join, we need these deleted records