CREATE OR REPLACE PROCEDURE compare_tables_with_multi (
TABLE_NAME IN VARCHAR2, l_batch_no IN NUMBER
) AS
sql_stmt VARCHAR2(10000);
clob CLOB;
ip_merge_stmt_1 CLOB;
ip_merge_stmt_2 CLOB;
set_clause VARCHAR2(10000);
insert_clause VARCHAR2(10000);
insert_src VARCHAR2(10000);
ip_tbl_last_update_col VARCHAR2(1000);
ip_tbl_create_date_col VARCHAR2(1000);
ip_tbl_primary_col VARCHAR2(1000);
resultset tbl_nmbr;
insert_rows_affected NUMBER(10);
chunk_sql VARCHAR2(4000);
task_name VARCHAR2(1000);
l_try NUMBER(5);
l_status NUMBER(5);
chunk_size NUMBER(10) := 20000;
BEGIN
-- Validate TABLE_NAME to prevent SQL injection and ensure it's not too long
IF NOT REGEXP_LIKE(TABLE_NAME, '^[A-Za-z0-9_]+$') THEN
RAISE_APPLICATION_ERROR(-20001, 'Invalid TABLE_NAME: ' || TABLE_NAME);
END IF;
-- Get table's meta data, ensuring only one row is returned
BEGIN
SELECT last_update_date_col, create_date_col, primary_key, merge_stmt_1, merge_stmt_2
INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
FROM temp_date_col_mapper
WHERE table_name = TABLE_NAME
FETCH FIRST 1 ROW ONLY;
EXCEPTION
WHEN TOO_MANY_ROWS THEN
RAISE_APPLICATION_ERROR(-20004, 'Multiple rows found in temp_date_col_mapper for table_name: ' || TABLE_NAME);
WHEN NO_DATA_FOUND THEN
RAISE_APPLICATION_ERROR(-20005, 'No metadata found in temp_date_col_mapper for table_name: ' || TABLE_NAME);
END;
-- Validate ip_tbl_primary_col
IF NOT REGEXP_LIKE(ip_tbl_primary_col, '^[A-Za-z0-9_]+$') THEN
RAISE_APPLICATION_ERROR(-20002, 'Invalid primary column name: ' || ip_tbl_primary_col);
END IF;
-- Define task name, ensuring it doesn't exceed 128 characters
task_name := SUBSTR('COMPARE_' || TABLE_NAME || '_' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS'), 1, 128);
-- Create the task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name);
-- Chunk the table based on ROW NUMBER()
chunk_sql := 'SELECT rn AS start_id, rn + :chunk_size - 1 AS end_id ' ||
'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' ||
'FROM cr_5_0_x_' || TABLE_NAME || ' a) ' ||
'WHERE MOD(rn - 1, :chunk_size) = 0';
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
task_name => task_name,
sql_stmt => chunk_sql,
by_rowid => FALSE
);
-- Define MERGE statement
merge_stmt := ip_merge_stmt_1 || ' WHERE rn BETWEEN :start_id AND :end_id ' || ip_merge_stmt_2;
-- Run task in parallel
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => task_name,
sql_stmt => merge_stmt,
language_flag => DBMS_SQL.NATIVE,
parallel_level => 8
);
-- Retry mechanism
l_try := 0;
l_status := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name => task_name);
WHILE (l_try < 2 AND l_status != DBMS_PARALLEL_EXECUTE.FINISHED) LOOP
DBMS_OUTPUT.PUT_LINE('Something went wrong in parallel execute - retrying');
l_try := l_try + 1;
DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name => task_name);
l_status := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name => task_name);
END LOOP;
-- Check status and log if failed
IF l_status != DBMS_PARALLEL_EXECUTE.FINISHED THEN
INSERT INTO error_log (timestamp, procedure_name, error_message)
VALUES (SYSDATE, 'compare_tables_with_multi', 'Parallel execution failed after retries for table: ' || TABLE_NAME);
COMMIT;
RAISE_APPLICATION_ERROR(-20003, 'Parallel execution failed after retries for table: ' || TABLE_NAME);
END IF;
-- Drop the task
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => task_name);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
INSERT INTO error_log (timestamp, procedure_name, error_message)
VALUES (SYSDATE, 'compare_tables_with_multi', SQLERRM || ' Stack: ' || DBMS_UTILITY.FORMAT_ERROR_STACK);
COMMIT;
ROLLBACK;
RAISE;
END compare_tables_with_multi;
/
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)