HomeArticlesOthersLogin
NewsCustom SearchContact
SIMPLELOAD - Elementary 'last phase' solution for ETL
    by Ludek Bob Jankovsky, 31-May-2017 (ETL PATTERNS)
Despite on poorest of the solution in comparison to true metadata driven ETL, I often meet transformations realized as views with almost the same format as the target table, followed by the pattern based implementation of the final stage - loading the transformed source data into target table, considering current data and maintaining history.
I have to admit the smart approach is proper for small and quick win solutions.
For easy implementation I scripted a skeleton of the last phase load element based on data dictionary information, supporting the most common patterns.


Skeleton?
It is improbable I could predict all possible needs of particular solutions, so I decided to start with the simplest code depicting the concept, but still simple and understandable to be used as a base for further expansion of functionality based on requirements of certain solution. So in the basic skeleton code no exceptions for columns, no overrides, no surrogate key support are included to keep it simple.
The skeleton also suppose usage of a single schema (on database schema where the transformation views, the Simplelload package and target tables are, what can be simply customized for different topology.

Patterns
  • INS inserts all source rows into target. It is useful for fact tables or to refill tables after truncate.
  • DIFFMRG inserts or updates target if different.
  • DIFFINS inserts into target if new.
  • DIFFUPD updates target if different.
  • FULLMRG performs full refresh transformation supporting all INSERT, UPDATE, DELETE parts.
  • FULLUPD performs full refresh transformation supporting UPDATE and DELETE parts only.
  • FULLSCD supports VALID FROM ... VALID TO history pattern.

Special columns
The LOAD function transfers all columns what are both in the source view and in the target table (by name).
Following exceptions - columns with special meaning - are still considered:
  • UPDATED_DATETIME Timestamp of last update of the record, identified as regexp_like([column name],'UPDATED_DATETIME$')
  • INSERTED_DATETIME Timestamp of inserting of the record, identified as regexp_like([column name],'INSERTED_DATETIME$')
  • VALID_FROM specifies start of business validity of the row version in versioned tables, identified as regexp_like([column name],'VALID_FROM$')
  • VALID_TO specifies end of business validity of the row version in versioned tables, identified as regexp_like([column name],'VALID_TO$'). Default infinity: date'3000-01-01'
  • DELETED_FLAG Supports soft delete approach, when deleted records remain in the table with the flag 'Y', identified as regexp_like([column name],'DELETED_FLAG$')


Usage

Begin
  SIMPLELOAD.Load(<source view name>,<target table name>,<current date>,<pattern>);
End;
/
Commit;


What next?

  • Try it
  • Customize it
  • Enjoy it

Create or replace package SIMPLELOAD as
------------------------------------------------------- 
--constants 
   c_regexp_UPDDT Varchar2(100 CHAR):='UPDATED_DATETIME$';
   c_regexp_INSDT Varchar2(100 CHAR):='INSERTED_DATETIME$';
   c_regexp_VALFR Varchar2(100 CHAR):='VALID_FROM$';
   c_regexp_VALTO Varchar2(100 CHAR):='VALID_TO$';
   c_regexp_DELFL Varchar2(100 CHAR):='DELETED_FLAG$';     
   c_regexp_SQKEY Varchar2(100 CHAR):='_KEY$';    --Regexp mask for surrogate key identification (FULLSCD)   
   c_regexp_KEY_p Varchar2(100 CHAR):='_[NUP]K$'; --Priority for natural key identification by name   
   c_limdat_VALFR Varchar2(100 CHAR):='date''1000-01-01''';     
   c_limdat_VALTO Varchar2(100 CHAR):='date''3000-01-01''';     
------------------------------------------------------- 
--methods
  function GetStmt( -- Get SQL statement of the Simple Load action
     p_source_name  Varchar2 --Source table  
    ,p_target_name  Varchar2 --Target table
    ,p_current_date Date     --Current (business) date relevant for the transformation 
    ,p_pattern      Varchar2 --Pattern can be one of:
                             --  INS       ... INSERTS into target 
                             --  DIFFMRG   ... INSERTS or UPDATES  target if different 
                             --  DIFFINS   ... INSERTS into target if new 
                             --  DIFFUPD   ... UPDATES target if different 
                             --  FULLMRG   ... Full refresh transformation supporting all INSERT, UPDATE, DELETE parts 
                             --  FULLUPD   ... Full refresh transformation supporting UPDATE and DELETE parts only
                             --  FULLSCD   ... VALID FROM ... VALID TO  history pattern, should be followed by the CREEPING one
                             --  CREEPING  ... Creepeng death algorythm to close validity after the FULLSCD pattern 
    ,p_directives   Varchar2 --Pattern directives ... not supported yet  
  )return CLOB;  -- The SQL statement
------------------------------------------------------- 
  procedure Load( -- Get SQL statement of the Simple Load action
     p_source_name  Varchar2 --Source table  
    ,p_target_name  Varchar2 --Target table
    ,p_current_date Date     --Current (business) date relevant for the transformation 
    ,p_pattern      Varchar2 --Pattern can be one of: 
                             --  INS       ... INSERTS into target 
                             --  DIFFMRG   ... INSERTS or UPDATES  target if different 
                             --  DIFFINS   ... INSERTS into target if new 
                             --  DIFFUPD   ... UPDATES target if different 
                             --  FULLMRG   ... Full refresh transformation supporting all INSERT, UPDATE, DELETE parts 
                             --  FULLUPD   ... Full refresh transformation supporting UPDATE and DELETE parts only
                             --  FULLSCD   ... VALID FROM ... VALID TO  history pattern
    ,p_directives   Varchar2:=null --Pattern directives ... not supported yet
  );    
------------------------------------------------------- 
End SIMPLELOAD;
/
--
Create or replace package body SIMPLELOAD as
------------------------------------------------------- 
  function GetStmt( -- Get SQL statement of the Simple Load action
     p_source_name  Varchar2 --Source table  
    ,p_target_name  Varchar2 --Target table
    ,p_current_date Date     --Current (business) date relevant for the transformation 
    ,p_pattern      Varchar2 --Pattern can be one of:
    ,p_directives   Varchar2 --Pattern directives ... not supported yet  
  )return CLOB is
  --variables etc 
   v_instrg_list  Varchar2(32767 BYTE); 
   v_inssrc_list  Varchar2(32767 BYTE); 
   v_full_list    Varchar2(32767 BYTE); 
   v_upddif_list  Varchar2(32767 BYTE); 
   v_on_list      Varchar2(32767 BYTE); 
   v_upd_list     Varchar2(32767 BYTE); 
   v_fullwhere    Varchar2(4000  CHAR); 
   v_valid_from   Varchar2(30 CHAR); 
   v_valid_thru   Varchar2(30 CHAR); 
   v_sign_key     Varchar2(30 CHAR); 
   v_upd_dt       Varchar2(30 CHAR); 
   v_TheCode      CLOB; 
   v_isFull       Boolean:=(p_pattern like 'FULL%'); 
   v_isCreep      Boolean:=(p_pattern like 'CREEPING%'); 
  -- 
   v_ins_deli     Varchar2(4 CHAR); 
   v_dif_deli     Varchar2(5 CHAR); 
   v_on_deli      Varchar2(7 CHAR); 
   v_upd_deli     Varchar2(4 CHAR); 
   -- 
   v_is_used      Boolean; 
   v_is_upd       Boolean; 
   v_sourcexp     Varchar2(4000 CHAR); 
   v_fullexp      Varchar2(4000 CHAR); 
  Begin 
    -- delimiters --- 
    v_ins_deli:='   '; 
    v_dif_deli:='    '; 
    v_on_deli :='      '; 
    v_upd_deli:='   '; 
    ----------------- 
    for r1 in ( 
      With COL as( --columns 
        Select trg.COLUMN_NAME 
          ,case when src.COLUMN_NAME is null then 'N' else 'Y' end as MAPPED 
          ,trg.COLUMN_ID 
          ,trg.NULLABLE 
        from USER_TAB_COLUMNS trg 
        left join USER_TAB_COLUMNS src on src.TABLE_NAME=p_source_name and src.COLUMN_NAME=trg.COLUMN_NAME 
        where trg.TABLE_NAME=p_target_name 
      ) 
      ,IDX as( --indexes 
        Select ix.INDEX_NAME, ixc.COLUMN_NAME 
        from USER_INDEXES ix 
        join USER_IND_COLUMNS ixc on ixc.INDEX_NAME=ix.INDEX_NAME 
        where ix.TABLE_NAME=p_target_name and ix.UNIQUENESS='UNIQUE' 
      ) 
      ,IDX_EX as( 
        Select distinct IDX.INDEX_NAME 
        from IDX 
        left join COL on COL.COLUMN_NAME=IDX.COLUMN_NAME and (COL.MAPPED='Y' or regexp_like(COL.COLUMN_NAME,c_regexp_VALFR) or regexp_like(COL.COLUMN_NAME,c_regexp_VALTO))
        where COL.COLUMN_NAME is null --antijoin 
      ) 
      ,IDX_KEY as( 
        Select 
          min(ix.INDEX_NAME)keep(dense_rank last order by Case when regexp_like(ix.INDEX_NAME,c_regexp_KEY_p) then 1 else 2 end,ix.INDEX_NAME) as INDEX_NAME 
        from USER_INDEXES ix 
        left join IDX_EX on IDX_EX.INDEX_NAME=ix.INDEX_NAME --antijoin 
        where ix.TABLE_NAME=p_target_name and IDX_EX.INDEX_NAME is null 
      ) 
      Select 
         COL.COLUMN_NAME 
        ,COL.MAPPED 
        ,Case when IDX.COLUMN_NAME is null then 'N' else 'Y' end as IKEY 
        ,Case when COL.NULLABLE='Y' then 'N' else 'Y' end as IMAND 
      from IDX_KEY 
      cross join COL   
      left join IDX on IDX.COLUMN_NAME=COL.COLUMN_NAME and IDX.INDEX_NAME=IDX_KEY.INDEX_NAME 
      order by Case when IDX.COLUMN_NAME is null then 2 else 1 end, COL.COLUMN_ID 
    ) LOOP 
      v_is_upd:=(r1.IKEY='N'); 
      if r1.MAPPED='Y' then 
        v_is_used:=true; 
        v_sourcexp:='SRC$.'||r1.COLUMN_NAME; 
        if v_is_upd then 
          if r1.IMAND='Y' and not v_isFull then 
            v_upddif_list:=v_upddif_list||v_dif_deli||'TRG$.'||r1.COLUMN_NAME||'!=SRC$.'||r1.COLUMN_NAME; 
          else 
            v_upddif_list:=v_upddif_list||v_dif_deli||'((TRG$.'||r1.COLUMN_NAME||' is not null or SRC$.'||r1.COLUMN_NAME||' is not null) and LNNVL(TRG$.'||r1.COLUMN_NAME||'=SRC$.'||r1.COLUMN_NAME||'))'; 
          end if;   
          v_dif_deli:=chr(10)||'  or'; 
          v_fullexp:='case when SRC$.'||v_sign_key||' is null then TRG$.'||r1.COLUMN_NAME||' else SRC$.'||r1.COLUMN_NAME||' end as '||r1.COLUMN_NAME; 
        elsif r1.IKEY='Y' then--key values   
          v_on_list:=v_on_list||v_on_deli||'TRG$.'||r1.COLUMN_NAME||'=SRC$.'||r1.COLUMN_NAME; 
          v_on_deli:=chr(10)||'  and '; 
          if v_sign_key is null then 
            v_sign_key:=r1.COLUMN_NAME; 
          end if; 
          v_fullexp:='nvl(SRC$.'||r1.COLUMN_NAME||',TRG$.'||r1.COLUMN_NAME||') as '||r1.COLUMN_NAME; 
        end if; 
      else   
        Case 
          when regexp_like(r1.COLUMN_NAME,c_regexp_UPDDT) then 
            v_sourcexp:='sysdate'; 
            v_fullexp:='sysdate as '||r1.COLUMN_NAME;
            v_upd_dt:=r1.COLUMN_NAME; 
            v_is_upd  :=true; 
            v_is_used:=true; 
          when regexp_like(r1.COLUMN_NAME,c_regexp_INSDT) then 
            v_sourcexp:='sysdate'; 
            v_fullexp:='NVL(TRG$.'||r1.COLUMN_NAME||',sysdate) as '||r1.COLUMN_NAME; 
            v_is_upd  :=false; 
            v_is_used:=true; 
          when regexp_like(r1.COLUMN_NAME,c_regexp_VALFR) then 
            v_valid_from:=r1.COLUMN_NAME; 
            if p_pattern ='FULLSCD' then 
              v_is_used:=true;
              v_fullexp:='date'''||to_char(p_current_date,'YYYY-MM-DD')||''' as '||r1.COLUMN_NAME; 
            else 
              v_is_used:=false;
            end if;   
          when regexp_like(r1.COLUMN_NAME,c_regexp_VALTO) then 
            v_valid_thru:=r1.COLUMN_NAME; 
            if p_pattern ='FULLSCD' then 
              v_is_used:=true;
              v_fullexp:=c_limdat_VALTO||' as '||r1.COLUMN_NAME; 
            else 
              v_is_used:=false;
            end if;   
          when regexp_like(r1.COLUMN_NAME,c_regexp_DELFL) then 
            v_sourcexp:='''N'''; 
            v_fullexp:='case when SRC$.'||v_sign_key||' is null then ''Y'' else ''N'' end as '||r1.COLUMN_NAME; 
            v_fullwhere:='(SRC$.'||v_sign_key||' is null and TRG$.'||r1.COLUMN_NAME||'=''N'')' 
            ||chr(10)||'  or(SRC$.'||v_sign_key||' is not null and TRG$.'||r1.COLUMN_NAME||'=''Y'')'; 
            v_is_used:=true; 
          when regexp_like(r1.COLUMN_NAME,c_regexp_SQKEY) and p_pattern ='FULLSCD' then  --surrogate key support
            Select DATA_DEFAULT into v_fullexp from USER_TAB_COLUMNS tc where tc.TABLE_NAME=p_target_name and tc.COLUMN_NAME=r1.COLUMN_NAME; 
            if upper(v_fullexp) like '%NEXTVAL%' then 
              v_sourcexp:=''; 
              v_fullexp:='TRG$.'||r1.COLUMN_NAME; 
              v_is_used:=true;
            else 
              v_is_used:=false; 
            end if; 
          else v_is_used:=false; 
        end case; 
      end if; 
      if v_is_used then 
        if v_isFull then --override 
          v_sourcexp:='SRC$.'||r1.COLUMN_NAME; 
        end if; 
        if not v_isCreep or r1.IKEY='Y' then 
          v_instrg_list:=v_instrg_list||v_ins_deli||'TRG$.'||r1.COLUMN_NAME; 
          v_inssrc_list:=v_inssrc_list||v_ins_deli||v_sourcexp; 
          v_full_list  :=v_full_list  ||v_ins_deli||v_fullexp; 
          v_ins_deli:=chr(10)||'  ,';
        end if;   
        if v_is_upd then 
          v_upd_list:=v_upd_list||v_upd_deli||'TRG$.'||r1.COLUMN_NAME||'='||v_sourcexp; 
          v_upd_deli:=chr(10)||'  ,'; 
        end if; 
      end if;   
    end LOOP; 
    --checks--
    if v_sign_key is null and p_pattern not in ('INS') then  
      raise_application_error(-20001,'SimpleLoad: Relevant key not found'); 
    end if;
    ----------
    If p_pattern in ('DIFFMRG','DIFFINS','DIFFUPD') then 
      v_TheCode:='Merge into '||p_target_name||' TRG$' 
      ||chr(10)||'using '||p_source_name||' SRC$' 
      ||chr(10)||'on (' 
      ||chr(10)||v_on_list 
      ||chr(10)||')'; 
      if  p_pattern in ('DIFFMRG','DIFFUPD') then 
        v_TheCode:=v_TheCode 
        ||chr(10)||'when matched then update set' 
        ||chr(10)||v_upd_list 
        ||chr(10)||'where' 
        ||chr(10)||v_upddif_list; 
      end if;   
      if  p_pattern in ('DIFFMRG','DIFFINS') then 
        v_TheCode:=v_TheCode 
        ||chr(10)||'when not matched then insert(' 
        ||chr(10)||v_instrg_list 
        ||chr(10)||')values(' 
        ||chr(10)||v_inssrc_list 
        ||chr(10)||')'; 
      end if;   
    elsif p_pattern='INS' then 
      v_TheCode:='Insert into '||p_target_name||' TRG$(' 
      ||chr(10)||v_instrg_list 
      ||chr(10)||') select' 
      ||chr(10)||v_inssrc_list 
      ||chr(10)||'from '||p_source_name||' SRC$'; 
    elsif  p_pattern in ('FULLMRG','FULLUPD') then 
      v_TheCode:='Merge into '||p_target_name||' TRG$' 
      ||chr(10)||'using(' 
      ||chr(10)||'select' 
      ||chr(10)||v_full_list 
      ||chr(10)||'from '||p_source_name||' SRC$' 
      ||chr(10)||case when p_pattern='FULLMRG' then 'full' else 'right' end||' join '||p_target_name||' TRG$ on' 
      ||chr(10)||v_on_list 
      ||chr(10)||'where' 
      ||chr(10)||v_upddif_list 
      ||v_dif_deli||v_fullwhere 
      ||chr(10)||') SRC$' 
      ||chr(10)||'on (' 
      ||chr(10)||v_on_list 
      ||chr(10)||')' 
      ||chr(10)||'when matched then update set' 
      ||chr(10)||v_upd_list; 
      if p_pattern='FULLMRG' then 
        v_TheCode:=v_TheCode 
        ||chr(10)||'when not matched then insert(' 
        ||chr(10)||v_instrg_list 
        ||chr(10)||')values(' 
        ||chr(10)||v_inssrc_list 
        ||chr(10)||')'; 
      end if;   
    elsif  p_pattern ='FULLSCD' then
      if v_valid_from is null then  
        raise_application_error(-20002,'SimpleLoad: The VALID_FROM column is necessary in the '||p_pattern); 
      end if;
      if v_valid_thru is null then  
        raise_application_error(-20003,'SimpleLoad: The VALID_TO column is necessary in the '||p_pattern); 
      end if;
      v_TheCode:='INSERT into '||p_target_name||' TRG$(' 
      ||chr(10)||v_instrg_list 
      ||chr(10)||')select' 
      ||chr(10)||v_full_list 
      ||chr(10)||'from '||p_source_name||' SRC$' 
      ||chr(10)||'full join(select * from '||p_target_name||' where '||v_valid_thru||'='||c_limdat_VALTO||') TRG$ on' 
      ||chr(10)||v_on_list 
      ||chr(10)||'where' 
      ||chr(10)||v_upddif_list 
      ||v_dif_deli||v_fullwhere; 
    elsif  p_pattern ='CREEPING' then
      v_TheCode:='Merge into '||p_target_name||' TRG$' 
      ||chr(10)||'using(' 
      ||chr(10)||'select' 
      ||chr(10)||v_instrg_list 
      ||v_ins_deli||'TRG$.'||v_valid_from
      ||v_ins_deli||'lead(TRG$.'||v_valid_from||')over(partition by '||replace(replace(v_instrg_list,chr(10)),' ') ||' order by TRG$.'||v_valid_from||') as NEXT$' 
      ||chr(10)||'from '||p_target_name||' TRG$' 
      ||chr(10)||'where TRG$.'||v_valid_thru||'='||c_limdat_VALTO
      ||chr(10)||') SRC$' 
      ||chr(10)||'on('
      ||chr(10)||v_on_list 
      ||v_on_deli||'SRC$.'||v_valid_from||'=TRG$.'||v_valid_from 
      ||v_on_deli||'SRC$.NEXT$ is not null' 
      ||chr(10)||')' 
      ||chr(10)||'when matched then update set' 
      ||chr(10)||'  TRG$.'||v_valid_thru||'=SRC$.NEXT$-1'
      ||Case when v_upd_dt is not null then chr(10)||' ,TRG$.'||v_upd_dt||'=sysdate' end; 
    else 
      raise_application_error(-20004,'SimpleLoad: Unsupported pattern '||p_pattern); 
    End if; 
    return v_TheCode; 
  End GetStmt; 
------------------------------------------------------- 
  procedure Load( -- Get SQL statement of the Simple Load action
     p_source_name  Varchar2 --Source table  
    ,p_target_name  Varchar2 --Target table
    ,p_current_date Date     --Current (business) date relevant for the transformation 
    ,p_pattern      Varchar2 --Pattern as described at the GetStmt  function 
    ,p_directives   Varchar2:=null
  ) is
    v_STMT CLOB;
  Begin
    DBMS_OUTPUT.ENABLE(1000000);
    v_STMT:=GetStmt(p_source_name,p_target_name,p_current_date,p_pattern,p_directives);
    execute immediate v_STMT;
    if p_pattern='FULLSCD' then
      v_STMT:=GetStmt(p_source_name,p_target_name,p_current_date,'CREEPING',p_directives);
      execute immediate v_STMT;
    end if; 
  Exception 
    When others then 
    DBMS_OUTPUT.PUT_LINE(v_STMT);
    raise;  
  End Load;   
------------------------------------------------------- 
End SIMPLELOAD;
/

[Download]


Support of surrogate keys
I recommend to use standard default functionality of Oracle 12g + to support surrogate keys:




Create sequence TEST1_SEQ order;


 ... ,SEQ_KEY INTEGER default on null TEST1_SEQ.nextval

For the FULLSCD pattern the package supports transport of the surrogate key between versions of records.
In all patterns never mention the surrogate key column in the source view.

Ludek Bob Jankovsky
All Right Reserved © 2007, Designed by Bob Jankovsky