Lager-wise Real-time Operational data store and Elasticity
by Ludek Bob Jankovsky, 30-Sep-2019 (ETL PATTERNS)
In last two articles about beer (well with the taste of some boring Data processing and Operational data stores) and especially the last one sliding on surface of the Lager brewing (Lager-wise Real-time ODS ETL process), we can go deeper in aspects of optimization and elasticity of the process.
At the very start I was getting familiar with the real-time processing I imagined I can solve only operational rows based changes and all these bulk operations such as EOD processing, EOM processing etc. would be solved by some other way. In the rush of real-time problems we just postponed that somewhere to the future. And in that "future" what is now past we learnt that there is no one and nothing what will solve that for us. There is no synchronization mechanism allowing to stop real-time processing, switch it to bulk one and then return to the real-time seamlessly. First there is the synchronization and process management problem, but more important is that not all bulk operations are well mapped and planned in the source system. We cannot say to users: "I am sorry, we do not have up to date data now, because the system XY does some bulk operation we cannot cope with real-time." Our real-time solution should be ready even for bulks. Just to be clear.
In the process of the Lager brewing we use similar kind of transformations as we do in bulk processes. In majority of cases these queries are executed for few rows so should work based on other execution plans than these bulk operations. At the same time we run similar queries molded by requirements of identification with different leading data sets what impacts necessary plans too. And by the other hand we can always expect some source bulk when we cannot run these transformations row-based, but the optimizer has to work similarly to the bulk processing.
When I teach optimization of SQL queries, I classify type of tasks as either row based, bulk or hybrid ones.
Task of Lager-wise Real-time ODS is the Hybrid one and is the most complicated because of the requirements volatility.
Following edges should be considered:
They are volatile - Every execution depends on volume of queued data (hops, IDs) and there could be various requirement for execution plans for each of them.
They are plenty - Despite I have illustrated steps of the brewing as particular SQL queries, that are not few individual queries. There are thousands of queries running every second and the solution should be stable and scaleable. Any wrong plan could shake the fluency. That is why the solution should be well monitored and discrepancies tracked and solved immediately.
They are multifaceted - Every transformation query is molded and used with different predicates i.e with different plans. In the row based execution we have to support much more indexes than in classical OLTP solutions to support all of them. We as well have to assure the predicate is always pushed to detail levels of the execution what could be tricky in cases of analytical function or aggregations.
They are concurrent - We cannot lock any queue during our execution because we have to be fluent. We have to count on new items appearing in every queue during our execution of previous batch.
Volatitity and bulks
As I said, there will always be bulks in our data processing. These can be of following types:
Periodical business bulk operations in source systems - Despite of 24x7 characteristics of most source systems there are still periodical actions like Close of day, Close of month, Close of year, so operations triggered in the source system by a time period closure. They are usually well planned and the scenario to stop real-time processing during these actions (well, at 24x7 system that would be temporary breach of consistency of real-time ODS), but there are not always firm time frames and these organizational steps would require additional communication between source systems and Real-time ODS process management to stop and then to restart in time. So it is much better (I cannot say easier) to make your Real-time ETL processing able to absorb bulks organically without external process management actions.
Periodical technical bulk operations in source systems - Differently from the previous category these actions are related to backup and clean-up periodical actions resulting in data changes. They have not always severe impact to data integration, but it is necessary to count on them.
Unintended small bulks (bubbles) caused by data processing complexity - Data processing is not always smooth however we want to. Small regular data change can cause huge changes of dependent tables thanks to multiplication effect. Especially changes in lists of values what are relatively stable and almost immutable, changes in master data (e.g. repairs of wrong consolidation) etc.
Reconciliations and repairs of source system after their failures or downtime - These things cannot be planned and are not considered as standard processing, but they happens. And we do not build our Real-time Operational data store for standard situations only, we build it for real life.
Reconciliation and repairs after our system failure or downtime - Is the same as the one above. There are failures, breaches, issues in real life.
So, lets look at possible solutions:
TYPE A: - Very efficient row based processing
Row-based does not mean exactly row by row processing. Depending on implementation systems every atomic operations cost some collateral expenses for statement execution. Even if we reduce hard parses and cache everything, there is still usually better to work in micro-batches than literally row by row. I will mention that later in detail. Now just for understanding, the term row based means either row by row or micro-batches aimed to small amount of records in data-sets (e.g. tables) using indexes and pushing predicates extensively.
Row-based processing is nice because of almost linear characteristics between load and time starting theoretically at zero-zero. So why not to rely on that and let these ETL processes work?
The problem is not that row based was not efficient with large load. It is the same way efficient with small and large amounts of record. The problem is in the part "the same". Because we need our system to work more efficient with large bulks.
The problem is not that row based is slow but that bulks are large. And that we need then processed much faster and more efficiently than particular rows.
Following picture shows the reason. The dissolving phase of row-based processing takes long time. All is proportional, but because of the size of the bulk too slow. We can afford bigger latency during the night Close of month out of business hours usually. But we want our system working properly in the morning.
From the graph above you can see why we use the SMALL - LARGE switch.
And now an important question:
How to decide what is the SMALL/LARGE rows count?
In some very serious resources you can read it is 42. I think it can differ statement by statement, by my experience the default could be set to 40000-100000 records on Oracle server but it is always good to allow designers to change the value for particular statements (pattern directive, hint) if necessary.
Following graph shows various paths of working with learned queue size and setup of statistics (spilling the number to optimizer). It shows both feasible and not feasible variants:
Second, whenever the execution gets into the volatile zone (it would be not often because it is off of the most frequent part of the small zone), randomize the execution plan (small/large) and collect supplemental log of method (plan), size of queue and execution time.
Third, after gathering enough statistics reconsider the particular limit SMALL-LARGE moving it either lower or higher along with the volatile area.
After gathering enough sample of results (you have to consider some level of available system resources volatility too, so let the sample be huge enough) just compute the Linear regression for both sets.
TYPE B: - Small - Large approach
Following graph shows comparison of the row-based processing and the bulk processing efficiency. Bulk (set-based) processing is a total looser around small numbers of processed records. The difference is as significant that it is easy to seek and alert wrong plans, missing indexes, not pushed predicates etc. There is no doubt we need the fine grain row based processing 90 - 99% of time.
By the other hand bulk processing using full scan of tables and hash joins is much more proficient in big amounts of data. So the task is to be able to provide every statement either row or set based depending on coming amount of data.
And there is one big advantage we get during the Hops and IDs processing: We know how many rows is in the queue! In each one!
So, how to reach that. How to make queries work based on the amount of coming data?
There is one quick and simple answer. Optimizer of the DB engine will do the work based on statistics. Like so many quick and simple answer it is not much real. I cannot speak bad about DB optimizer itself. In the case of our solutions it does unbelievable things, unbelievable pushing of predicates and other incredible things. No organic brewing described in last article would be possible without the great work of optimizer. But...
Optimizer works based on statistics - but we cannot recalculate statistics before each run of an atomic statement and size of queues can volatile. We can of course rely on dynamic sampling, but it gets the database core into overhead too.
Optimizer remembers last resolutions - Optimizer does not do hard parse and does not optimize again and again at repeating statements. So running the same statement several times in row automatically stuck on the first plan based on the first queue size.
The previous point is reasonable - Causing hard parse before each atomic transaction would boil the batch.
Two ranges is enough - you have to run each statement in two modes, the SMALL one and the LARGE one. There are several ways how to do that and they depends partially on features of the DB engine. So you can:
Use two distinguished statements instead of one each forced to another plan. Switch between these could be decided based on size of leading queue. There is no fear of redundancy as these statements are generated from the same metadata.
Set statistics on queue at the moment the size is switching between SMALL and LARGE and vice versa. The idea of setting realistic statistics based on before learned size of queue before each step fails on the fact setting of statistics causes hard parse of statement (when it should have any effect) an that would be an overhead at many atomic executions, so the number of switches should be minimized.
Use two queues, one for SMALL and one for LARGE - may be combined with difference of generated code for small and for large (different early pruning, LARGE statement has less limitations so sometimes it could be reasonable). This solution is more complicated, you have to solve the problem how and when to decide the queue will be the LARGE one, but it helps to dissolve bulks at the same time as new changes are processed.
Explanation:
Row by row - That is the lazy way we decide to keep all the processing row based (as described in the chapter above). I classified that as not feasible way because of inability to dissolve bulks in time (in some extent it can make the system stuck), but it can work somehow and at least you have spared the effort of switching between small and large and having two execution plans and at least the most frequent SMALL phase work efficiently when it does not dissolve bulks of previous hard night.
Realistic statistics - Another one I classified as non feasible. By one hand I trust optimizer (a bit) and it would work well with plans, but it increases number of hard-parses in the most frequent zone of SMALL operations. It causes huge performance issue when the system becomes big enough.
White lie - My favorite solution. I trust the optimizer a bit, but not as much to let it decide in volatile zone around SMALL-LARGE boundary. This type of truth bending makes the SMALL-LARGE decision binary and counts on two plans, two ways. In the case you decide for two different statements for SMALL and LARGE executions (small overhead while generating, but much more possibilities of hard-core tuning), that is the best way. The advantage is we hold our finger on the execution process firmly, disadvantage the impact of wrongly set SMALL-LARGE limit.
Partial white lie - This way is a compromise between the White lie and the Realistic statistics. It combines both advantage of trusted optimizer and reduction of hard parses in the most frequent SMALL zone. The advantage is less sensitivity of SMALL-LARGE limit set too low, disadvantage could be not perfect work of optimizer in the volatile zone.
Advanced White lie
The concept of white lie comes of presumption optimizer fails sometimes at boundaries. We do not give it chance and screw the decision into binary option - ROW or BULK. But where we got the empirical 100'000 records? Is that common for each transformation? Is it too low or too high? (The Partial white lie method solves the problem by setting the limit lower and letting the optimizer work the rest.)
We have two options:
Assure possibility to setup overriding value for each transformation as a pattern directive or hint and to fill it in the manual tuning process when necessary.
Let the system self-tune the limit based on statistic of previous runs.
The first option is a good standard, it costs almost nothing and can help in any case. The second one is a bit more complicated, but it can lead you to the best target performance around the SMALL-LARGE boundary.
First, define a volatile zone around the SMALL-LARGE boundary, small enough not to impact the SMALL zone efficiency, but large enough to make it probable to gather some executions in that zone (e.g 15% of the SMALL-LARGE limit).
Forth - After computing linear regression just find the intersection and push the SMALL-LARGE limit that way. Our defined volatile area will move too. Results of previous statistics should not be deleted. Shifting the SMALL-LARGE limit we changed nothing about transformations so our statistics data are still valid. Only reason for deleting statistics should be significant change of transformation logic or patterns used.
Note: There could appear a question why we do not use the capability of optimizer to solve the mess in the volatile area instead of such a linear regression circus. This specialized action is based on our knowledge of character and shape of our operations. The optimizer has more advanced resources, but we "natives" know the terrain of our land much better. That is why my pessimistic approach to tuning is usually so successful.
They are plenty
Most systems have their performance limits much lower than their functional limits which are sometimes still lower than documented limits.
That is what hit us so many times in our solutions. We decide for some technology feature not to invent a wheel again, it works great in the proof of concept phase on a subset, it sometimes even forks well during simulated peek-wise performance tests, but fails when the usage gets to the full extent of our solution. The problem is we adapt technologies tested for intensive usage on small amount of data for big amounts of data with thousands of transformation tasks. They are simply plenty.
There are following factors to be considered:
Number of transformations (T) - means number of transformation modules, mappings ...
Number of source tables (S) - number of event processing enabled source tables attending in transformations as sources (e.g. tables of the L0 layer, further or recursively used tables of L1 layer).
Number of listening processes (L) - number of permanently listening processes.
Number of processing threads (R) - number of threads the system allows to listen and run transformation and all necessary about our brewing.
Plain spread
The simplest variant of solution (the most parallel but limits reaching) is the solution one listener - one thread method. In that way each process either related to processing hops or to transformations themselves has its permanently listening thread. That gives each process the same priority and even the smallest transformation takes one permanently listening process. The solution is simplest and reduces needs of decisions about particular transformations or tables importance.
R = L
L = S + T
Thread groups - Transformations and source processes are collected into thread groups. The size and organization could be organized based on priority and SLA of particular transformations. All the signal processing is then aggregated to less number of groups. The solution is more complex but you can better scale the performance, diversify preferences and suppress unimportant parts of solution.
R = L
L < S + T
Dispatchers - Listening and processing can be separated The solution brings some overhead about starting new processes if not organized in pools.
They are multifaceted
There is another difference of Lager-wise solutions - one transformation appears in several facets. One transformation query is used in several ways with several drivers (leading tables) with different plans. That requires much more extensive indexing because every transformation in each facet should be supported for row-level access by relevant indexes. The indexing at the start should be "bi-directional", when you index one column for realizing join in one direction, you have to index the column on the other side also. In many cases that is just about Primary keys and foreign keys indexes, but not all queries go through standard references.
Following picture shows access paths for facets of one transformation (the one from our example in the previous article).
They are concurrent
In most previous examples I have presented these transformations like they work on static data-sets. But while we process data of queue (micro-batch), new data appear in the queue. We cannot block them, but we have to know what part of queue has been processed to cleanse them from the queue. So what can we do:
Standard queue - We can use some still proven queue solutions, but aside the our needed they usually support many other roles, what makes them a bit heavy solution for our reason and we can hit on performance limits easily, because "they are plenty" is still valid.
Bookmark - Bookmarking the batch start is not bad solution, but the problem is in the world of concurrent writing transactions in read-committed isolation you cannot relay neither on sequence nor on timestamp. Other isolation levels are out of thinking for the task.
Flagging - We can set flags on records and decide the which records will be part of the micro-batch. Unfortunately the flag settings is a bit expensive operation for our reason.
Statement isolation - We can use statement isolation - read-use-delete in one statement (maybe using some memory structure to help). That requires totally homologous environment.
In-memory indexing - Less expensive method of flagging. It is relatively flexible (allowing to limit maximum bulk size).
Keep in mind you have to act transaction safe to keep the system consistent.
Limitations of bulks
There are following reasons for limiting maximal bulk size:
Bulk processing in the database - In our previous graphs we have presented bulk processes as stable in big numbers of records with and slow growing characteristics in big numbers. Bulk processing uses HASH JOIN method excessively and even there there are some limitations in huge numbers (we speak about 10'000'000 multiplications range).
In-memory bulk index limits - there is some overhead in the case of huge bulks if you use this method.
Fail-over - too large and long running bulks cause longer rollback time and repeat time when the process is interrupted.
Process continuity - In the case of chained processes smaller bulks allow continuing process start earlier so the overall latency gets smaller.
The optimal size of bulk limit depends on process again, it could be set up to few millions by default and refined by pattern directives if necessary.
Well, the Lager is still not perfect, but much smoother elastic taste. Just try it in your pub.