SIMPLELOAD - Elementary 'last phase' solution for ETL
by Ludek Bob Jankovsky, 31-May-2017 (ETL PATTERNS)
Despite on poorest of the solution in comparison to true metadata driven ETL, I often meet transformations realized as views with almost the same format as the target table, followed by the pattern based implementation of the final stage - loading the transformed source data into target table, considering current data and maintaining history.
I have to admit the smart approach is proper for small and quick win solutions.
For easy implementation I scripted a skeleton of the last phase load element based on data dictionary information, supporting the most common patterns.
Skeleton? It is improbable I could predict all possible needs of particular solutions, so I decided to start with the simplest code depicting the concept, but still simple and understandable to be used as a base for further expansion of functionality based on requirements of certain solution. So in the basic skeleton code no exceptions for columns, no overrides, no surrogate key support are included to keep it simple.
The skeleton also suppose usage of a single schema (on database schema where the transformation views, the Simplelload package and target tables are, what can be simply customized for different topology.
Patterns
INS inserts all source rows into target. It is useful for fact tables or to refill tables after truncate.
DIFFMRG inserts or updates target if different.
DIFFINS inserts into target if new.
DIFFUPD updates target if different.
FULLMRG performs full refresh transformation supporting all INSERT, UPDATE, DELETE parts.
FULLUPD performs full refresh transformation supporting UPDATE and DELETE parts only.
FULLSCD supports VALID FROM ... VALID TO history pattern.
Special columns The LOAD function transfers all columns what are both in the source view and in the target table (by name).
Following exceptions - columns with special meaning - are still considered:
UPDATED_DATETIME Timestamp of last update of the record, identified as regexp_like([column name],'UPDATED_DATETIME$')
INSERTED_DATETIME Timestamp of inserting of the record, identified as regexp_like([column name],'INSERTED_DATETIME$')
VALID_FROM specifies start of business validity of the row version in versioned tables, identified as regexp_like([column name],'VALID_FROM$')
VALID_TO specifies end of business validity of the row version in versioned tables, identified as regexp_like([column name],'VALID_TO$'). Default infinity: date'3000-01-01'
DELETED_FLAG Supports soft delete approach, when deleted records remain in the table with the flag 'Y', identified as regexp_like([column name],'DELETED_FLAG$')
Usage
Begin
SIMPLELOAD.Load(<source view name>,<target table name>,<current date>,<pattern>);
End;
/
Commit;
What next?
Try it
Customize it
Enjoy it
Create or replace package SIMPLELOAD as
-------------------------------------------------------
--constants
c_regexp_UPDDT Varchar2(100 CHAR):='UPDATED_DATETIME$';
c_regexp_INSDT Varchar2(100 CHAR):='INSERTED_DATETIME$';
c_regexp_VALFR Varchar2(100 CHAR):='VALID_FROM$';
c_regexp_VALTO Varchar2(100 CHAR):='VALID_TO$';
c_regexp_DELFL Varchar2(100 CHAR):='DELETED_FLAG$';
c_regexp_SQKEY Varchar2(100 CHAR):='_KEY$'; --Regexp mask for surrogate key identification (FULLSCD)
c_regexp_KEY_p Varchar2(100 CHAR):='_[NUP]K$'; --Priority for natural key identification by name
c_limdat_VALFR Varchar2(100 CHAR):='date''1000-01-01''';
c_limdat_VALTO Varchar2(100 CHAR):='date''3000-01-01''';
-------------------------------------------------------
--methods
function GetStmt( -- Get SQL statement of the Simple Load action
p_source_name Varchar2 --Source table
,p_target_name Varchar2 --Target table
,p_current_date Date --Current (business) date relevant for the transformation
,p_pattern Varchar2 --Pattern can be one of:
-- INS ... INSERTS into target
-- DIFFMRG ... INSERTS or UPDATES target if different
-- DIFFINS ... INSERTS into target if new
-- DIFFUPD ... UPDATES target if different
-- FULLMRG ... Full refresh transformation supporting all INSERT, UPDATE, DELETE parts
-- FULLUPD ... Full refresh transformation supporting UPDATE and DELETE parts only
-- FULLSCD ... VALID FROM ... VALID TO history pattern, should be followed by the CREEPING one
-- CREEPING ... Creepeng death algorythm to close validity after the FULLSCD pattern
,p_directives Varchar2 --Pattern directives ... not supported yet
)return CLOB; -- The SQL statement
-------------------------------------------------------
procedure Load( -- Get SQL statement of the Simple Load action
p_source_name Varchar2 --Source table
,p_target_name Varchar2 --Target table
,p_current_date Date --Current (business) date relevant for the transformation
,p_pattern Varchar2 --Pattern can be one of:
-- INS ... INSERTS into target
-- DIFFMRG ... INSERTS or UPDATES target if different
-- DIFFINS ... INSERTS into target if new
-- DIFFUPD ... UPDATES target if different
-- FULLMRG ... Full refresh transformation supporting all INSERT, UPDATE, DELETE parts
-- FULLUPD ... Full refresh transformation supporting UPDATE and DELETE parts only
-- FULLSCD ... VALID FROM ... VALID TO history pattern
,p_directives Varchar2:=null --Pattern directives ... not supported yet
);
-------------------------------------------------------
End SIMPLELOAD;
/
--
Create or replace package body SIMPLELOAD as
-------------------------------------------------------
function GetStmt( -- Get SQL statement of the Simple Load action
p_source_name Varchar2 --Source table
,p_target_name Varchar2 --Target table
,p_current_date Date --Current (business) date relevant for the transformation
,p_pattern Varchar2 --Pattern can be one of:
,p_directives Varchar2 --Pattern directives ... not supported yet
)return CLOB is
--variables etc
v_instrg_list Varchar2(32767 BYTE);
v_inssrc_list Varchar2(32767 BYTE);
v_full_list Varchar2(32767 BYTE);
v_upddif_list Varchar2(32767 BYTE);
v_on_list Varchar2(32767 BYTE);
v_upd_list Varchar2(32767 BYTE);
v_fullwhere Varchar2(4000 CHAR);
v_valid_from Varchar2(30 CHAR);
v_valid_thru Varchar2(30 CHAR);
v_sign_key Varchar2(30 CHAR);
v_upd_dt Varchar2(30 CHAR);
v_TheCode CLOB;
v_isFull Boolean:=(p_pattern like 'FULL%');
v_isCreep Boolean:=(p_pattern like 'CREEPING%');
--
v_ins_deli Varchar2(4 CHAR);
v_dif_deli Varchar2(5 CHAR);
v_on_deli Varchar2(7 CHAR);
v_upd_deli Varchar2(4 CHAR);
--
v_is_used Boolean;
v_is_upd Boolean;
v_sourcexp Varchar2(4000 CHAR);
v_fullexp Varchar2(4000 CHAR);
Begin
-- delimiters ---
v_ins_deli:=' ';
v_dif_deli:=' ';
v_on_deli :=' ';
v_upd_deli:=' ';
-----------------
for r1 in (
With COL as( --columns
Select trg.COLUMN_NAME
,case when src.COLUMN_NAME is null then 'N' else 'Y' end as MAPPED
,trg.COLUMN_ID
,trg.NULLABLE
from USER_TAB_COLUMNS trg
left join USER_TAB_COLUMNS src on src.TABLE_NAME=p_source_name and src.COLUMN_NAME=trg.COLUMN_NAME
where trg.TABLE_NAME=p_target_name
)
,IDX as( --indexes
Select ix.INDEX_NAME, ixc.COLUMN_NAME
from USER_INDEXES ix
join USER_IND_COLUMNS ixc on ixc.INDEX_NAME=ix.INDEX_NAME
where ix.TABLE_NAME=p_target_name and ix.UNIQUENESS='UNIQUE'
)
,IDX_EX as(
Select distinct IDX.INDEX_NAME
from IDX
left join COL on COL.COLUMN_NAME=IDX.COLUMN_NAME and (COL.MAPPED='Y' or regexp_like(COL.COLUMN_NAME,c_regexp_VALFR) or regexp_like(COL.COLUMN_NAME,c_regexp_VALTO))
where COL.COLUMN_NAME is null --antijoin
)
,IDX_KEY as(
Select
min(ix.INDEX_NAME)keep(dense_rank last order by Case when regexp_like(ix.INDEX_NAME,c_regexp_KEY_p) then 1 else 2 end,ix.INDEX_NAME) as INDEX_NAME
from USER_INDEXES ix
left join IDX_EX on IDX_EX.INDEX_NAME=ix.INDEX_NAME --antijoin
where ix.TABLE_NAME=p_target_name and IDX_EX.INDEX_NAME is null
)
Select
COL.COLUMN_NAME
,COL.MAPPED
,Case when IDX.COLUMN_NAME is null then 'N' else 'Y' end as IKEY
,Case when COL.NULLABLE='Y' then 'N' else 'Y' end as IMAND
from IDX_KEY
cross join COL
left join IDX on IDX.COLUMN_NAME=COL.COLUMN_NAME and IDX.INDEX_NAME=IDX_KEY.INDEX_NAME
order by Case when IDX.COLUMN_NAME is null then 2 else 1 end, COL.COLUMN_ID
) LOOP
v_is_upd:=(r1.IKEY='N');
if r1.MAPPED='Y' then
v_is_used:=true;
v_sourcexp:='SRC$.'||r1.COLUMN_NAME;
if v_is_upd then
if r1.IMAND='Y' and not v_isFull then
v_upddif_list:=v_upddif_list||v_dif_deli||'TRG$.'||r1.COLUMN_NAME||'!=SRC$.'||r1.COLUMN_NAME;
else
v_upddif_list:=v_upddif_list||v_dif_deli||'((TRG$.'||r1.COLUMN_NAME||' is not null or SRC$.'||r1.COLUMN_NAME||' is not null) and LNNVL(TRG$.'||r1.COLUMN_NAME||'=SRC$.'||r1.COLUMN_NAME||'))';
end if;
v_dif_deli:=chr(10)||' or';
v_fullexp:='case when SRC$.'||v_sign_key||' is null then TRG$.'||r1.COLUMN_NAME||' else SRC$.'||r1.COLUMN_NAME||' end as '||r1.COLUMN_NAME;
elsif r1.IKEY='Y' then--key values
v_on_list:=v_on_list||v_on_deli||'TRG$.'||r1.COLUMN_NAME||'=SRC$.'||r1.COLUMN_NAME;
v_on_deli:=chr(10)||' and ';
if v_sign_key is null then
v_sign_key:=r1.COLUMN_NAME;
end if;
v_fullexp:='nvl(SRC$.'||r1.COLUMN_NAME||',TRG$.'||r1.COLUMN_NAME||') as '||r1.COLUMN_NAME;
end if;
else
Case
when regexp_like(r1.COLUMN_NAME,c_regexp_UPDDT) then
v_sourcexp:='sysdate';
v_fullexp:='sysdate as '||r1.COLUMN_NAME;
v_upd_dt:=r1.COLUMN_NAME;
v_is_upd :=true;
v_is_used:=true;
when regexp_like(r1.COLUMN_NAME,c_regexp_INSDT) then
v_sourcexp:='sysdate';
v_fullexp:='NVL(TRG$.'||r1.COLUMN_NAME||',sysdate) as '||r1.COLUMN_NAME;
v_is_upd :=false;
v_is_used:=true;
when regexp_like(r1.COLUMN_NAME,c_regexp_VALFR) then
v_valid_from:=r1.COLUMN_NAME;
if p_pattern ='FULLSCD' then
v_is_used:=true;
v_fullexp:='date'''||to_char(p_current_date,'YYYY-MM-DD')||''' as '||r1.COLUMN_NAME;
else
v_is_used:=false;
end if;
when regexp_like(r1.COLUMN_NAME,c_regexp_VALTO) then
v_valid_thru:=r1.COLUMN_NAME;
if p_pattern ='FULLSCD' then
v_is_used:=true;
v_fullexp:=c_limdat_VALTO||' as '||r1.COLUMN_NAME;
else
v_is_used:=false;
end if;
when regexp_like(r1.COLUMN_NAME,c_regexp_DELFL) then
v_sourcexp:='''N''';
v_fullexp:='case when SRC$.'||v_sign_key||' is null then ''Y'' else ''N'' end as '||r1.COLUMN_NAME;
v_fullwhere:='(SRC$.'||v_sign_key||' is null and TRG$.'||r1.COLUMN_NAME||'=''N'')'
||chr(10)||' or(SRC$.'||v_sign_key||' is not null and TRG$.'||r1.COLUMN_NAME||'=''Y'')';
v_is_used:=true;
when regexp_like(r1.COLUMN_NAME,c_regexp_SQKEY) and p_pattern ='FULLSCD' then --surrogate key support
Select DATA_DEFAULT into v_fullexp from USER_TAB_COLUMNS tc where tc.TABLE_NAME=p_target_name and tc.COLUMN_NAME=r1.COLUMN_NAME;
if upper(v_fullexp) like '%NEXTVAL%' then
v_sourcexp:='';
v_fullexp:='TRG$.'||r1.COLUMN_NAME;
v_is_used:=true;
else
v_is_used:=false;
end if;
else v_is_used:=false;
end case;
end if;
if v_is_used then
if v_isFull then --override
v_sourcexp:='SRC$.'||r1.COLUMN_NAME;
end if;
if not v_isCreep or r1.IKEY='Y' then
v_instrg_list:=v_instrg_list||v_ins_deli||'TRG$.'||r1.COLUMN_NAME;
v_inssrc_list:=v_inssrc_list||v_ins_deli||v_sourcexp;
v_full_list :=v_full_list ||v_ins_deli||v_fullexp;
v_ins_deli:=chr(10)||' ,';
end if;
if v_is_upd then
v_upd_list:=v_upd_list||v_upd_deli||'TRG$.'||r1.COLUMN_NAME||'='||v_sourcexp;
v_upd_deli:=chr(10)||' ,';
end if;
end if;
end LOOP;
--checks--
if v_sign_key is null and p_pattern not in ('INS') then
raise_application_error(-20001,'SimpleLoad: Relevant key not found');
end if;
----------
If p_pattern in ('DIFFMRG','DIFFINS','DIFFUPD') then
v_TheCode:='Merge into '||p_target_name||' TRG$'
||chr(10)||'using '||p_source_name||' SRC$'
||chr(10)||'on ('
||chr(10)||v_on_list
||chr(10)||')';
if p_pattern in ('DIFFMRG','DIFFUPD') then
v_TheCode:=v_TheCode
||chr(10)||'when matched then update set'
||chr(10)||v_upd_list
||chr(10)||'where'
||chr(10)||v_upddif_list;
end if;
if p_pattern in ('DIFFMRG','DIFFINS') then
v_TheCode:=v_TheCode
||chr(10)||'when not matched then insert('
||chr(10)||v_instrg_list
||chr(10)||')values('
||chr(10)||v_inssrc_list
||chr(10)||')';
end if;
elsif p_pattern='INS' then
v_TheCode:='Insert into '||p_target_name||' TRG$('
||chr(10)||v_instrg_list
||chr(10)||') select'
||chr(10)||v_inssrc_list
||chr(10)||'from '||p_source_name||' SRC$';
elsif p_pattern in ('FULLMRG','FULLUPD') then
v_TheCode:='Merge into '||p_target_name||' TRG$'
||chr(10)||'using('
||chr(10)||'select'
||chr(10)||v_full_list
||chr(10)||'from '||p_source_name||' SRC$'
||chr(10)||case when p_pattern='FULLMRG' then 'full' else 'right' end||' join '||p_target_name||' TRG$ on'
||chr(10)||v_on_list
||chr(10)||'where'
||chr(10)||v_upddif_list
||v_dif_deli||v_fullwhere
||chr(10)||') SRC$'
||chr(10)||'on ('
||chr(10)||v_on_list
||chr(10)||')'
||chr(10)||'when matched then update set'
||chr(10)||v_upd_list;
if p_pattern='FULLMRG' then
v_TheCode:=v_TheCode
||chr(10)||'when not matched then insert('
||chr(10)||v_instrg_list
||chr(10)||')values('
||chr(10)||v_inssrc_list
||chr(10)||')';
end if;
elsif p_pattern ='FULLSCD' then
if v_valid_from is null then
raise_application_error(-20002,'SimpleLoad: The VALID_FROM column is necessary in the '||p_pattern);
end if;
if v_valid_thru is null then
raise_application_error(-20003,'SimpleLoad: The VALID_TO column is necessary in the '||p_pattern);
end if;
v_TheCode:='INSERT into '||p_target_name||' TRG$('
||chr(10)||v_instrg_list
||chr(10)||')select'
||chr(10)||v_full_list
||chr(10)||'from '||p_source_name||' SRC$'
||chr(10)||'full join(select * from '||p_target_name||' where '||v_valid_thru||'='||c_limdat_VALTO||') TRG$ on'
||chr(10)||v_on_list
||chr(10)||'where'
||chr(10)||v_upddif_list
||v_dif_deli||v_fullwhere;
elsif p_pattern ='CREEPING' then
v_TheCode:='Merge into '||p_target_name||' TRG$'
||chr(10)||'using('
||chr(10)||'select'
||chr(10)||v_instrg_list
||v_ins_deli||'TRG$.'||v_valid_from
||v_ins_deli||'lead(TRG$.'||v_valid_from||')over(partition by '||replace(replace(v_instrg_list,chr(10)),' ') ||' order by TRG$.'||v_valid_from||') as NEXT$'
||chr(10)||'from '||p_target_name||' TRG$'
||chr(10)||'where TRG$.'||v_valid_thru||'='||c_limdat_VALTO
||chr(10)||') SRC$'
||chr(10)||'on('
||chr(10)||v_on_list
||v_on_deli||'SRC$.'||v_valid_from||'=TRG$.'||v_valid_from
||v_on_deli||'SRC$.NEXT$ is not null'
||chr(10)||')'
||chr(10)||'when matched then update set'
||chr(10)||' TRG$.'||v_valid_thru||'=SRC$.NEXT$-1'
||Case when v_upd_dt is not null then chr(10)||' ,TRG$.'||v_upd_dt||'=sysdate' end;
else
raise_application_error(-20004,'SimpleLoad: Unsupported pattern '||p_pattern);
End if;
return v_TheCode;
End GetStmt;
-------------------------------------------------------
procedure Load( -- Get SQL statement of the Simple Load action
p_source_name Varchar2 --Source table
,p_target_name Varchar2 --Target table
,p_current_date Date --Current (business) date relevant for the transformation
,p_pattern Varchar2 --Pattern as described at the GetStmt function
,p_directives Varchar2:=null
) is
v_STMT CLOB;
Begin
DBMS_OUTPUT.ENABLE(1000000);
v_STMT:=GetStmt(p_source_name,p_target_name,p_current_date,p_pattern,p_directives);
execute immediate v_STMT;
if p_pattern='FULLSCD' then
v_STMT:=GetStmt(p_source_name,p_target_name,p_current_date,'CREEPING',p_directives);
execute immediate v_STMT;
end if;
Exception
When others then
DBMS_OUTPUT.PUT_LINE(v_STMT);
raise;
End Load;
-------------------------------------------------------
End SIMPLELOAD;
/
Support of surrogate keys I recommend to use standard default functionality of Oracle 12g +
to support surrogate keys:
Create sequence TEST1_SEQ order;
... ,SEQ_KEY INTEGER default on null TEST1_SEQ.nextval
For the FULLSCD pattern the package supports transport of the surrogate key between versions of records.
In all patterns never mention the surrogate key column in the source view.