HomeArticlesSlideshowsLogin
NewsCustom SearchContact
Light-weight asynchronous adhoc tasks processor AHOY
    by Ludek Bob Jankovsky, 18-Jun-2020 (HEAP WORKFLOW)
Soemtimes we need to run several one-time tasks on Oracle database, just aside the standard orchestration.
Following simple solution helps to avoid makeshifts to cope with such situations again and again.
The solution is created as simple as possible, 2 tables and one package. It requires DBMS_SCHEDULER granted.
The definition table AHOY_TASK acts as the log table too. There could be necessary informative column added in the sake of particular solutions.
The parameters table AHOY_PAR contains one basic parameter APAR_THREADS allowing to define number of parallel threads or to stop running completely by setup to O(zero).
The queue works in the FIFO mode, first defined task will be first processed. No other dependencies.
The solution allows to increase and decrease number of parallel threads during runtime what allows the "watch and regulate" approach.
This is an AD HOC solution so clean the AHOY_TASK again and again after completing, there is no aging implemented in the sake of simplicity.


Metadata tables:

Create table AHOY_TASK(
  ATASK_KEY   integer generated as IDENTITY primary key
 ,ATASK_CMD   clob
 ,ATASK_STATE varchar2(20)
 ,ATASK_START timestamp
 ,ATASK_END   timestamp
 ,ATASK_MSG   clob
 ,ATASK_JOB   varchar2(100)
);
Create table AHOY_PAR(
   APAR_THREADS  integer
  ,APAR_LAST_ERR CLOB
);
Insert into AHOY_PAR(APAR_THREADS, APAR_LAST_ERR) values (0,'');
commit;

[Download]

The AHOY_PAR.APAR_THREADS parameter limits maximum number of parallel tasks. If set to 0 it means only manualy started threads will process just one task and then discontinue.
Following states are possible in the AHOY_TASK.ATASK_STATE:

  • TBD - ready to be started
  • RUN - just running
  • OK - finished successfully
  • ERR - finished unsuccessfully


Package

Create or replace package AHOY_PROC as
--AdHOc Yield processor
  procedure threadAsync;
  procedure thread(p_job_name varchar2);
End AHOY_PROC;
/
Create or replace package body AHOY_PROC as
--AdHOc Yield processor
  c_package_name  varchar2(100) := 'AHOY_PROC';
  c_thread_prefix varchar2(100) := 'AHOY_JOB_';
---  
  procedure threadAsync is    -- methods sterts one thread asynchronously and lets it work all the remaining tasks
    v_name varchar2(100);
    v_cnt  integer;
    v_max  integer; 
  begin 
    Select count(1),max(to_number(regexp_substr(job_name,'\d+$'))) into v_cnt, v_max from USER_SCHEDULER_JOBS where regexp_like(job_name,'^'||c_thread_prefix || '\d+$');   --check maximum number of AHOY threads
    v_name := c_thread_prefix||to_char(nvl(v_max,0)+1);  -- decide the name of new one incrementing the thread number  
    DBMS_SCHEDULER.CREATE_JOB(job_name => v_name, job_type => 'PLSQL_BLOCK', job_action => 'begin '||c_package_name||'.thread('''|| v_name ||''');end;', number_of_arguments => 0, start_date => sysdate) ; --create 
    DBMS_SCHEDULER.ENABLE(v_name) ;                                                                                                                                                                         --enable
    commit; 
  end threadAsync;
---  
  procedure thread(p_job_name varchar2) is  -- all these threads are homogenous, they do all tasks about dispatching and running particular jobs
     v_ataskKey   integer;
     v_ataskCmd   clob;
     v_ataskCnt   integer;
     v_err        clob;   
     v_maxThreads integer;
     v_curThreads integer;
  begin
    loop
      --SEEK THE TASK and CHECK ORPHANS
      lock table AHOY_PAR in exclusive mode; --lock for task manipulation (dummy lock on auxiliary table just to avoid two threads gathering the same task) 
      with L1 as(
         select A.ATASK_KEY, A.ATASK_CMD, row_number()over(order by A.ATASK_KEY) rn, count(1)over(partition by A.ATASK_STATE) cnt   
         from AHOY_TASK A
         left join USER_SCHEDULER_JOBS  J on J.job_name = A.ATASK_JOB and J.job_name != p_job_name  -- antijoin, check orphans. Orphans are unfinished tasks caused by a forced drop of jobs rather than by an internal error. 
         where a.ATASK_STATE = 'TBD' or (a.ATASK_STATE = 'RUN' and J.job_name is null) --for the state RUN with no job related it behaves like if it was TBD.
        )
        select ATASK_KEY, ATASK_CMD, cnt into v_ataskKey, v_ataskCmd, v_ataskCnt from L1 where rn=1;  -- just for the first eligible task
      update  AHOY_TASK set ATASK_STATE = 'RUN', ATASK_JOB = p_job_name, ATASK_START = systimestamp where ATASK_KEY=v_ataskKey; -- notify the task is to be started
      commit; --unlocks the task seeking process for other threads, this one is booked by current thread by the update
      --WAKE UP COWORKERS 
      select APAR_THREADS into v_maxThreads from AHOY_PAR;  --get maximum number of threads from the parameter table. It should be done again because it could be changed from outside to stop, reduce, or increase number of threads
      select count(1) into v_curThreads from USER_SCHEDULER_JOBS where regexp_like(job_name,'^'||c_thread_prefix || '\d+$');  -- get number of currently running threads
      if v_ataskCnt > 1 and v_maxThreads > v_curThreads then  -- if current number of threads is less than the maximum one and there are more tasks waiting
        threadAsync;  -- it starts another thread - just one, the process will recure with the new thread if necessary
      end if;
      --PROCESSING
      begin --inner fault processing
        execute immediate v_ataskCmd;  -- dynamic run of the command 
        update  AHOY_TASK set ATASK_STATE = 'OK', ATASK_END = systimestamp where ATASK_KEY=v_ataskKey; -- check as OK after successful finish
        commit;  -- outside commit, the best practice is not to commit transactions within tasks themselves, just keep them intra-transaction
      exception 
        when others then  -- any error will be documented
          v_err := SQLERRM;
          update  AHOY_TASK set ATASK_STATE = 'ERR', ATASK_END = systimestamp, ATASK_MSG = v_err where ATASK_KEY=v_ataskKey; -- here
          commit;
      end; --/inner fault processing
      --- ASLEEP
      if v_maxThreads < v_curThreads then 
        raise NO_DATA_FOUND;  --end loop, job
      end if;
    end loop;   
  exception   
    when NO_DATA_FOUND then null; -- expected exit, no data to process
    when others then --unexpected errors will be documented in the parameter table AHOY_PAR
      v_err := SQLERRM;
      update  AHOY_PAR set APAR_LAST_ERR = v_err;
      commit;
  end thread;
End AHOY_PROC;
/

[Download]


Processing

--Insert task into queue (dummy tasks example)

begin 
  for i in 1..50 loop
    Insert into AHOY_TASK(ATASK_CMD,ATASK_STATE)values('begin DBMS_LOCK.sleep(100); end;','TBD');
  end loop;
end ;
/


--Allow 4 tasks simultanously

update AHOY_PAR set APAR_THREADS = 4;
commit;


--Start tasks

begin 
  AHOY_PROC.threadAsync;
end;
/


--stop all tasks after finished

update AHOY_PAR set APAR_THREADS = 0;
commit;



--check running tasks

Select * from AHOY_TASK order by 1;


--check jobs

Select * from USER_SCHEDULER_JOBS where regexp_like(job_name,'^AHOY_JOB_\d+$');


Ludek Bob Jankovsky
All Right Reserved © 2007, Designed by Bob Jankovsky