DEV Community

Arpit Tiwari
Arpit Tiwari

Posted on

Updated


CREATE OR REPLACE PROCEDURE compare_tables_with_multi (
    table_names IN SYS.ODCIVARCHAR2LIST,
    l_batch_no IN NUMBER
) AS
    sql_stmt VARCHAR2(10000);
    merge_stmt CLOB;
    ip_merge_stmt_1 CLOB;
    ip_merge_stmt_2 CLOB;
    set_clause VARCHAR2(10000);
    insert_clause_dst VARCHAR2(10000);
    insert_clause_src VARCHAR2(10000);
    ip_tbl_last_update_col VARCHAR2(1000);
    ip_tbl_create_date_col VARCHAR2(1000);
    ip_tbl_primary_col VARCHAR2(1000);
    resultset tbl_nmbr;
    insert_rows_affected NUMBER(10);
    chunk_sql VARCHAR2(4000);
    task_name VARCHAR2(1000);
    l_try NUMBER(5);
    l_status NUMBER(5);
    chunk_size NUMBER(10) := 20000;
BEGIN
    FOR i IN 1..table_names.COUNT LOOP
        -- Validate table_name
        IF table_names(i) IS NULL OR NOT REGEXP_LIKE(table_names(i), '^[A-Za-z0-9_]+$') THEN
            RAISE_APPLICATION_ERROR(-20001, 'Invalid table_name: ' || table_names(i));
        END IF;

        -- Verify that table_name exists in temp_date_col_mapper
        SELECT COUNT(*)
        INTO l_status
        FROM temp_date_col_mapper
        WHERE table_name = table_names(i);

        IF l_status = 0 THEN
            RAISE_APPLICATION_ERROR(-20002, 'Table name ' || table_names(i) || ' not found in temp_date_col_mapper');
        END IF;

        -- Get table's metadata
        BEGIN
            SELECT last_update_date_col, create_date_col, primary_key, merge_stmt_1, merge_stmt_2
            INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
            FROM temp_date_col_mapper
            WHERE table_name = table_names(i)
            FETCH FIRST 1 ROW ONLY;
        EXCEPTION
            WHEN TOO_MANY_ROWS THEN
                RAISE_APPLICATION_ERROR(-20003, 'Multiple rows found in temp_date_col_mapper for table_name: ' || table_names(i));
            WHEN NO_DATA_FOUND THEN
                RAISE_APPLICATION_ERROR(-20004, 'No metadata found in temp_date_col_mapper for table_name: ' || table_names(i));
        END;

        -- Validate ip_tbl_primary_col
        IF NOT REGEXP_LIKE(ip_tbl_primary_col, '^[A-Za-z0-9_]+$') THEN
            RAISE_APPLICATION_ERROR(-20005, 'Invalid primary column name: ' || ip_tbl_primary_col);
        END IF;

        -- Define task name
        task_name := SUBSTR('COMPARE_' || table_names(i) || '_' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS'), 1, 128);

        -- Create the task
        DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name);

        -- Chunk the table based on ROW NUMBER()
        chunk_sql := 'SELECT rn AS start_id, rn + :chunk_size - 1 AS end_id ' ||
                     'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' ||
                     'FROM cr_5_0_x_' || table_names(i) || ' a) ' ||
                     'WHERE MOD(rn - 1, :chunk_size) = 0';

        DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
            task_name => task_name,
            sql_stmt => chunk_sql,
            by_rowid => FALSE
        );

        -- Define MERGE statement
        merge_stmt := ip_merge_stmt_1 || ' WHERE rn BETWEEN :start_id AND :end_id ' || ip_merge_stmt_2;

        -- Run task in parallel
        DBMS_PARALLEL_EXECUTE.RUN_TASK(
            task_name => task_name,
            sql_stmt => merge_stmt,
            language_flag => DBMS_SQL.NATIVE,
            parallel_level => 8
        );

        -- Retry mechanism
        l_try := 0;
        l_status := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name => task_name);
        WHILE (l_try < 2 AND l_status != DBMS_PARALLEL_EXECUTE.FINISHED) LOOP
            DBMS_OUTPUT.PUT_LINE('Something went wrong in parallel execute - retrying');
            l_try := l_try + 1;
            DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name => task_name);
            l_status := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name => task_name);
        END LOOP;

        -- Check status and log if failed
        IF l_status != DBMS_PARALLEL_EXECUTE.FINISHED THEN
            INSERT INTO error_log (timestamp, procedure_name, error_message)
            VALUES (SYSDATE, 'compare_tables_with_multi', 'Parallel execution failed after retries for table: ' || table_names(i));
            COMMIT;
            RAISE_APPLICATION_ERROR(-20006, 'Parallel execution failed after retries for table: ' || table_names(i));
        END IF;

        -- Drop the task
        DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => task_name);
    END LOOP;

    COMMIT;

EXCEPTION
    WHEN OTHERS THEN
        INSERT INTO error_log (timestamp, procedure_name, error_message)
        VALUES (SYSDATE, 'compare_tables_with_multi', SQLERRM || ' Stack: ' || DBMS_UTILITY.FORMAT_ERROR_STACK);
        COMMIT;
        ROLLBACK;
        RAISE;
END compare_tables_with_multi;
/

Enter fullscreen mode Exit fullscreen mode

Top comments (0)