CREATE OR REPLACE PROCEDURE compare_tables_with_multi (
table_names IN SYS.ODCIVARCHAR2LIST,
l_batch_no IN NUMBER
) AS
sql_stmt VARCHAR2(10000);
merge_stmt CLOB;
ip_merge_stmt_1 CLOB;
ip_merge_stmt_2 CLOB;
set_clause VARCHAR2(10000);
insert_clause_dst VARCHAR2(10000);
insert_clause_src VARCHAR2(10000);
ip_tbl_last_update_col VARCHAR2(1000);
ip_tbl_create_date_col VARCHAR2(1000);
ip_tbl_primary_col VARCHAR2(1000);
resultset tbl_nmbr;
insert_rows_affected NUMBER(10);
chunk_sql VARCHAR2(4000);
task_name VARCHAR2(1000);
l_try NUMBER(5);
l_status NUMBER(5);
chunk_size NUMBER(10) := 20000;
BEGIN
FOR i IN 1..table_names.COUNT LOOP
-- Validate table_name
IF table_names(i) IS NULL OR NOT REGEXP_LIKE(table_names(i), '^[A-Za-z0-9_]+$') THEN
RAISE_APPLICATION_ERROR(-20001, 'Invalid table_name: ' || table_names(i));
END IF;
-- Verify that table_name exists in temp_date_col_mapper
SELECT COUNT(*)
INTO l_status
FROM temp_date_col_mapper
WHERE table_name = table_names(i);
IF l_status = 0 THEN
RAISE_APPLICATION_ERROR(-20002, 'Table name ' || table_names(i) || ' not found in temp_date_col_mapper');
END IF;
-- Get table's metadata
BEGIN
SELECT last_update_date_col, create_date_col, primary_key, merge_stmt_1, merge_stmt_2
INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
FROM temp_date_col_mapper
WHERE table_name = table_names(i)
FETCH FIRST 1 ROW ONLY;
EXCEPTION
WHEN TOO_MANY_ROWS THEN
RAISE_APPLICATION_ERROR(-20003, 'Multiple rows found in temp_date_col_mapper for table_name: ' || table_names(i));
WHEN NO_DATA_FOUND THEN
RAISE_APPLICATION_ERROR(-20004, 'No metadata found in temp_date_col_mapper for table_name: ' || table_names(i));
END;
-- Validate ip_tbl_primary_col
IF NOT REGEXP_LIKE(ip_tbl_primary_col, '^[A-Za-z0-9_]+$') THEN
RAISE_APPLICATION_ERROR(-20005, 'Invalid primary column name: ' || ip_tbl_primary_col);
END IF;
-- Define task name
task_name := SUBSTR('COMPARE_' || table_names(i) || '_' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS'), 1, 128);
-- Create the task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name);
-- Chunk the table based on ROW NUMBER()
chunk_sql := 'SELECT rn AS start_id, rn + :chunk_size - 1 AS end_id ' ||
'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' ||
'FROM cr_5_0_x_' || table_names(i) || ' a) ' ||
'WHERE MOD(rn - 1, :chunk_size) = 0';
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
task_name => task_name,
sql_stmt => chunk_sql,
by_rowid => FALSE
);
-- Define MERGE statement
merge_stmt := ip_merge_stmt_1 || ' WHERE rn BETWEEN :start_id AND :end_id ' || ip_merge_stmt_2;
-- Run task in parallel
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => task_name,
sql_stmt => merge_stmt,
language_flag => DBMS_SQL.NATIVE,
parallel_level => 8
);
-- Retry mechanism
l_try := 0;
l_status := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name => task_name);
WHILE (l_try < 2 AND l_status != DBMS_PARALLEL_EXECUTE.FINISHED) LOOP
DBMS_OUTPUT.PUT_LINE('Something went wrong in parallel execute - retrying');
l_try := l_try + 1;
DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name => task_name);
l_status := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name => task_name);
END LOOP;
-- Check status and log if failed
IF l_status != DBMS_PARALLEL_EXECUTE.FINISHED THEN
INSERT INTO error_log (timestamp, procedure_name, error_message)
VALUES (SYSDATE, 'compare_tables_with_multi', 'Parallel execution failed after retries for table: ' || table_names(i));
COMMIT;
RAISE_APPLICATION_ERROR(-20006, 'Parallel execution failed after retries for table: ' || table_names(i));
END IF;
-- Drop the task
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => task_name);
END LOOP;
COMMIT;
EXCEPTION
WHEN OTHERS THEN
INSERT INTO error_log (timestamp, procedure_name, error_message)
VALUES (SYSDATE, 'compare_tables_with_multi', SQLERRM || ' Stack: ' || DBMS_UTILITY.FORMAT_ERROR_STACK);
COMMIT;
ROLLBACK;
RAISE;
END compare_tables_with_multi;
/
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)