Step by step to test stream on 10gr2

db version: 10.2.0.4
test platform: windows xp
source db: orcl
dest db: orcl1

–default all execute on source database except specified

SQL> show parameter compati

NAME                                 TYPE        VALUE
———————————— ———– ————
compatible                           string      10.2.0.3.0
sga_max_size                         big integer 276M
sga_target                           big integer 276M
processes                            integer     150
timed_statistics                     boolean     TRUE
log_archive_dest_1                   string      LOCATION=D:\oracle\oradata\orcl\archive
log_archive_format                   string      ORCL_ARC%S_%R.%T
undo_retention                       integer     7200
remote_archive_enable                string      TRUE
global_names                         boolean     TRUE
open_links                           integer     10
job_queue_processes                  integer     10
parallel_max_servers                 integer     20
streams_pool_size                    big integer 44M

–enable database supplemental logging
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE) COLUMNS;

Database altered.

–create users for test
SQL> create user t identified by t default tablespace users temporary tablespace temp;

User created.

SQL> grant dba to t;

Grant succeeded.

conn t/t
SQL> create table mystream (id number, name varchar(100));

Table created.

SQL> insert into mystream values(1,’a’);

1 row created.

SQL> commit;

Commit complete.

–dest db
create user t identified by t default tablespace users temporary tablespace temp;
grant dba to t;

–create database link
CREATE public DATABASE LINK orcl1 CONNECT TO tadmin identified by tadmin USING ‘ORCL1’;

–source and dest db
–tadmin is the stream administrator user, MUST be different from SYS and SYSTEM
CREATE USER tadmin IDENTIFIED BY tadmin DEFAULT TABLESPACE USERS TEMPORARY TABLESPACE TEMP;
GRANT CONNECT, RESOURCE, DBA TO tadmin;
exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(grantee => ‘TADMIN’, grant_privileges => TRUE);
create directory REORG_DIR as ‘D:\oracle\reorg’; –souce db
create directory REORG_DIR as ‘C:\oracle\reorg’; –dest db

–source db
conn tadmin/tadmin

begin
DBMS_STREAMS_ADM.MAINTAIN_SCHEMAS(
schema_names  => ‘T’,
source_database => ‘ORCL’,
source_directory_object => ‘REORG_DIR’,
destination_directory_object => ‘REORG_DIR’,
destination_database => ‘ORCL1′,
perform_actions => TRUE
);
end;
/

–source db
T@orcl>select * from mystream;

ID NAME
———- ——————–
1 a

–check dest db
SQL> conn t/t
Connected.
SQL> select * from mystream;

ID NAME
———- ——————–
1 a

–add data to source table
T@orcl>insert into mystream values(2,’b’);

1 row created.

T@orcl>insert into mystream values(3,’c’);

1 row created.

T@orcl>commit;

Commit complete.

T@orcl>select * from mystream;

ID NAME
———- ——————–
1 a
2 b
3 c

–check dest db
SQL> select * from mystream;

ID NAME
———- ——————–
1 a
2 b
3 c

–add many data to source db
T@orcl>insert into mystream select object_id,object_name from dba_objects;

46767 rows created.

T@orcl>commit;

Commit complete.

T@orcl>select count(*) from mystream;

COUNT(*)
———-
46770

–check dest db
SQL> select count(*) from mystream;

COUNT(*)
———-
3  –it is still 3

–wait 3+ minutes and check again. It is slow.
SQL> select count(*) from mystream;

COUNT(*)
———-
46770

–let’s recreate the stream with ddl enable
–both source and dest db
conn / as sysdba
exec dbms_streams_adm.remove_streams_configuration();
drop user tadmin cascade;
CREATE USER tadmin IDENTIFIED BY tadmin DEFAULT TABLESPACE USERS TEMPORARY TABLESPACE TEMP;
GRANT CONNECT, RESOURCE, DBA TO tadmin;
exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(grantee => ‘TADMIN’, grant_privileges => TRUE);

–dest db
drop user t cascade;
–source db
truncate table t.mystream;

–recreate with ddl enable, source db
CONN tadmin/tadmin
begin
DBMS_STREAMS_ADM.MAINTAIN_SCHEMAS(
schema_names  => ‘T’,
source_database => ‘ORCL’,
source_directory_object => ‘REORG_DIR’,
destination_directory_object => ‘REORG_DIR’,
destination_database => ‘ORCL1’,
perform_actions => TRUE,
include_ddl => TRUE);
end;
/

–add data
conn t/t
insert into mystream values (1,’ddl’);
commit;

–check dest
SQL> select * from mystream;

ID NAME
———- ——————–
1 ddl

–dml is ok, let’s do ddl test
–source db, create table
T@orcl>create table mystream2 as select * from dba_objects where 1=0;

Table created.

–check dest db
SQL> select table_name from user_tables;

TABLE_NAME
——————————
MYSTREAM
MYSTREAM2

–mystream2 is there. DDL is ok too.
–let’s test tons of data insert
T@orcl>insert into mystream2 select * from dba_objects;

46807 rows created.

T@orcl>insert into mystream2 select * from mystream2;

46807 rows created.

T@orcl>/

93614 rows created.

T@orcl>commit;

Commit complete.

T@orcl>insert into mystream2 select * from mystream2;

187228 rows created.

T@orcl>commit;

Commit complete.

–wait n minutes, check dest db
SQL> select count(*) from mystream2;

COUNT(*)
———-
0 –it still ZERO!

–check source db alert log, get this
Wed Oct 29 15:47:38 2008
Error 600 occured while spilling buffered messages
Wed Oct 29 15:47:38 2008
Errors in file d:\oracle\admin\orcl\bdump\orcl_q004_5340.trc:
ORA-00600: internal error code, arguments: [kwqbmcrcpts101], [], [], [], [], [], [], []
–according to metalink Note:435741.1, this bug is fixed in 11g

–also get low SGA error
Wed Oct 29 15:47:59 2008
Propagation Schedule for (TADMIN.ORCL$CAPQ, “TADMIN”.”ORCL$APPQ”@ORCL1) encountered following error:
ORA-23603: STREAMS enqueue aborted due to low SGA
this is my sga size:
sga_max_size                         big integer 276M
sga_target                           big integer 276M

My impress so far, stream is not stable or efficient enough in 10gr2.

Advertisements

About Alex Zeng
I would be very happy if this blog can help you. I appreciate every honest comments. Please forgive me if I'm too busy to reply your comments in time.

4 Responses to Step by step to test stream on 10gr2

  1. Alex says:

    –We can use the procedure DBMS_STREAMS_ADM.MAINTAIN_SCHEMAS to generate scripts. Here is an example.
    SET ECHO ON
    SET VERIFY OFF
    WHENEVER SQLERROR EXIT SQL.SQLCODE;

    ——————————————————————-
    — get TNSNAME and streams admin user details for both the databases
    ——————————————————————–
    PROMPT
    PROMPT ‘Enter TNS Name of site 1 as parameter 1:’
    DEFINE db1 = &1
    PROMPT
    PROMPT ‘Enter streams admin username for site 1 as parameter 2:’
    DEFINE strm_adm_db1 = &2
    PROMPT
    PROMPT ‘Enter streams admin password for site 1 as parameter 3:’
    DEFINE strm_adm_pwd_db1 = &3
    PROMPT
    PROMPT ‘Enter TNS Name of site 2 as parameter 4:’
    DEFINE db2 = &4
    PROMPT
    PROMPT ‘Enter streams admin username for site 2 as parameter 5:’
    DEFINE strm_adm_db2 = &5
    PROMPT
    PROMPT ‘Enter streams admin password for site 2 as parameter 6:’
    DEFINE strm_adm_pwd_db2 = &6

    — connect as streams administrator to site 1
    PROMPT Connecting as streams administrator to site 1
    CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1

    — Add supplemental log group for table “T”.”MYSTREAM”

    BEGIN
    EXECUTE IMMEDIATE ‘ALTER TABLE “T”.”MYSTREAM” ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, FOREIGN KEY, UNIQUE INDEX) COLUMNS’;
    EXCEPTION WHEN OTHERS THEN
    IF sqlcode = -32588 THEN NULL; — Logging attribute exists
    ELSE RAISE;
    END IF;
    END;
    /

    — Add supplemental log group for table “T”.”T”

    BEGIN
    EXECUTE IMMEDIATE ‘ALTER TABLE “T”.”T” ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, FOREIGN KEY, UNIQUE INDEX) COLUMNS’;
    EXCEPTION WHEN OTHERS THEN
    IF sqlcode = -32588 THEN NULL; — Logging attribute exists
    ELSE RAISE;
    END IF;
    END;
    /

    — Set up queue “TADMIN”.”ORCL$CAPQ”

    BEGIN
    dbms_streams_adm.set_up_queue(
    queue_table => ‘”TADMIN”.”ORCL$CAPQT”‘,
    storage_clause => NULL,
    queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
    queue_user => ”);
    END;
    /

    — PROPAGATE changes for schema T

    DECLARE
    version_num NUMBER := 0;
    release_num NUMBER := 0;
    pos NUMBER;
    initpos NUMBER;
    q2q BOOLEAN;
    stmt VARCHAR2(100);
    ver VARCHAR2(30);
    compat VARCHAR2(30);

    BEGIN
    BEGIN
    stmt := ‘BEGIN dbms_utility.db_version@ORCL1(:ver, :compat); END;’;
    EXECUTE IMMEDIATE stmt USING OUT ver, OUT compat;
    — Extract version number
    initpos := 1;
    pos := INSTR(compat, ‘.’, initpos, 1);
    IF pos > 0 THEN
    version_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
    initpos := pos + 1;

    — Extract release number
    pos := INSTR(compat, ‘.’, initpos, 1);
    IF pos > 0 THEN
    release_num := TO_NUMBER(SUBSTR(compat, initpos,
    pos – initpos));
    initpos := pos + 1;
    ELSE
    release_num := TO_NUMBER(SUBSTR(compat, initpos));
    END IF;
    ELSE
    version_num := TO_NUMBER(SUBSTR(compat, initpos));
    END IF;

    — use q2q propagation if compatibility >= 10.2
    IF version_num > 10 OR
    (version_num = 10 AND release_num >=2) THEN
    q2q := TRUE;
    ELSE
    q2q := FALSE;
    END IF;

    EXCEPTION WHEN OTHERS THEN
    q2q := FALSE;
    END;

    dbms_streams_adm.add_schema_propagation_rules(
    schema_name => ‘”T”‘,
    streams_name => ”,
    source_queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
    destination_queue_name => ‘”TADMIN”.”ORCL$APPQ”@ORCL1’,
    include_dml => TRUE,
    include_ddl => TRUE,
    include_tagged_lcr => TRUE,
    source_database => ‘ORCL’,
    inclusion_rule => TRUE,
    and_condition => NULL,
    queue_to_queue => q2q);
    END;
    /

    — Disable propagation. Enable after destination has been setup

    DECLARE
    q2q VARCHAR2(10);
    destn_q VARCHAR2(65);
    BEGIN
    SELECT queue_to_queue INTO q2q
    FROM dba_propagation
    WHERE source_queue_owner = ‘TADMIN’ AND
    source_queue_name = ‘ORCL$CAPQ’ AND
    destination_queue_owner = ‘TADMIN’ AND
    destination_queue_name = ‘ORCL$APPQ’ AND
    destination_dblink = ‘ORCL1’;

    IF q2q = ‘TRUE’ THEN
    destn_q := ‘”TADMIN”.”ORCL$APPQ”‘;
    ELSE
    destn_q := NULL;
    END IF;

    dbms_aqadm.disable_propagation_schedule(
    queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
    destination => ‘ORCL1’,
    destination_queue => destn_q);
    EXCEPTION WHEN OTHERS THEN
    IF sqlcode = -24065 THEN NULL; — propagation already disabled
    ELSE RAISE;
    END IF;
    END;
    /

    — CAPTURE changes for schema T

    DECLARE
    compat VARCHAR2(512);
    initpos NUMBER;
    pos NUMBER;
    version_num NUMBER;
    release_num NUMBER;
    compat_func VARCHAR2(65);
    get_compatible VARCHAR2(4000);
    BEGIN
    SELECT value INTO compat
    FROM v$parameter
    WHERE name = ‘compatible’;

    — Extract version number
    initpos := 1;
    pos := INSTR(compat, ‘.’, initpos, 1);
    IF pos > 0 THEN
    version_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
    initpos := pos + 1;

    — Extract release number
    pos := INSTR(compat, ‘.’, initpos, 1);
    IF pos > 0 THEN
    release_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
    initpos := pos + 1;
    ELSE
    release_num := TO_NUMBER(SUBSTR(compat, initpos));
    END IF;
    END IF;

    IF version_num < 10 THEN
    compat_func := ‘dbms_streams.compatible_9_2’;
    ELSIF version_num = 10 THEN
    IF release_num < 2 THEN
    compat_func := ‘dbms_streams.compatible_10_1’;
    ELSE
    compat_func := ‘dbms_streams.compatible_10_2’;
    END IF;
    ELSE
    compat_func := ‘dbms_streams.compatible_10_2’;
    END IF;

    get_compatible := ‘:lcr.get_compatible() ‘”T”‘,
    streams_type => ‘CAPTURE’,
    streams_name => ‘”ORCL$CAP”‘,
    queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
    include_dml => TRUE,
    include_ddl => TRUE,
    include_tagged_lcr => TRUE,
    source_database => ‘ORCL’,
    inclusion_rule => TRUE,
    and_condition => get_compatible);
    END;
    /

    — Datapump SCHEMA MODE EXPORT

    DECLARE
    h1 NUMBER; — data pump job handle
    schema_expr_list VARCHAR2(32767); — for metadata_filter
    cnt NUMBER; — temp variable
    object_owner dbms_utility.uncl_array; — obj owners
    job_state VARCHAR2(30); — job state
    status ku$_Status; — data pump status
    job_not_exist exception;
    pragma exception_init(job_not_exist, -31626);
    BEGIN

    object_owner(1) := ‘T’;
    FOR idx IN 1..1 LOOP
    — schema does not exist locally, need instantiation
    IF schema_expr_list IS NULL THEN
    schema_expr_list := ‘(‘;
    ELSE
    schema_expr_list := schema_expr_list ||’,’;
    END IF;
    schema_expr_list := schema_expr_list||””||object_owner(idx)||””;
    END LOOP;
    IF schema_expr_list IS NOT NULL THEN
    schema_expr_list := schema_expr_list || ‘)’;
    ELSE
    COMMIT;
    RETURN;
    END IF;

    h1 := dbms_datapump.open(operation=>’EXPORT’,job_mode=>’SCHEMA’,
    remote_link=>”,
    job_name=>NULL, version=>’COMPATIBLE’);

    dbms_datapump.metadata_filter(
    handle=>h1,
    name=>’SCHEMA_EXPR’,
    value=>’IN’||schema_expr_list);

    dbms_datapump.add_file(
    handle=>h1,
    filename=>’expdat26.dmp’,
    directory=>’REORG_DIR’,
    filetype=>dbms_datapump.ku$_file_type_dump_file);
    dbms_datapump.add_file(
    handle=>h1,
    filename=>’expdat26.dlg’,
    directory=>’REORG_DIR’,
    filetype=>dbms_datapump.ku$_file_type_log_file);

    dbms_datapump.start_job(h1);

    job_state := ‘UNDEFINED’;
    BEGIN
    WHILE (job_state != ‘COMPLETED’) AND (job_state != ‘STOPPED’) LOOP
    status := dbms_datapump.get_status(
    handle => h1,
    mask => dbms_datapump.ku$_status_job_error +
    dbms_datapump.ku$_status_job_status +
    dbms_datapump.ku$_status_wip,
    timeout => -1);
    job_state := status.job_status.state;
    dbms_lock.sleep(10);
    END LOOP;
    EXCEPTION WHEN job_not_exist THEN
    dbms_output.put_line(‘job finished’);
    END;

    — Transfer dump file to the destination directory
    dbms_file_transfer.put_file(
    source_directory_object => ‘”REORG_DIR”‘,
    source_file_name => ‘expdat26.dmp’,
    destination_directory_object => ‘”REORG_DIR”‘,
    destination_file_name => ‘expdat26.dmp’,
    destination_database => ‘ORCL1’);

    COMMIT;
    EXCEPTION WHEN OTHERS THEN
    ROLLBACK;
    RAISE;
    END;
    /

    — Start capture process ORCL$CAP

    BEGIN
    dbms_capture_adm.start_capture(
    capture_name => ‘”ORCL$CAP”‘);
    EXCEPTION WHEN OTHERS THEN
    IF sqlcode = -26666 THEN NULL; — CAPTURE process already running
    ELSE RAISE;
    END IF;
    END;
    /
    — connect as streams administrator to site 2
    PROMPT Connecting as streams administrator to site 2
    CONNECT &strm_adm_db2/&strm_adm_pwd_db2@&db2

    — Datapump SCHEMA MODE IMPORT

    DECLARE
    h1 NUMBER; — data pump job handle
    schema_expr_list VARCHAR2(32767); — for metadata_filter
    cnt NUMBER; — temp variable
    object_owner dbms_utility.uncl_array; — obj owners
    job_state VARCHAR2(30); — job state
    status ku$_Status; — data pump status
    job_not_exist exception;
    pragma exception_init(job_not_exist, -31626);
    BEGIN

    object_owner(1) := ‘T’;
    FOR idx IN 1..1 LOOP
    — schema does not exist locally, need instantiation
    IF schema_expr_list IS NULL THEN
    schema_expr_list := ‘(‘;
    ELSE
    schema_expr_list := schema_expr_list ||’,’;
    END IF;
    schema_expr_list := schema_expr_list||””||object_owner(idx)||””;
    END LOOP;
    IF schema_expr_list IS NOT NULL THEN
    schema_expr_list := schema_expr_list || ‘)’;
    ELSE
    COMMIT;
    RETURN;
    END IF;

    h1 := dbms_datapump.open(operation=>’IMPORT’,job_mode=>’SCHEMA’,
    remote_link=>”,
    job_name=>NULL, version=>’COMPATIBLE’);

    dbms_datapump.add_file(
    handle=>h1,
    filename=>’expdat26.dmp’,
    directory=>’REORG_DIR’,
    filetype=>dbms_datapump.ku$_file_type_dump_file);
    dbms_datapump.add_file(
    handle=>h1,
    filename=>’expdat26.dlg’,
    directory=>’REORG_DIR’,
    filetype=>dbms_datapump.ku$_file_type_log_file);

    dbms_datapump.start_job(h1);

    job_state := ‘UNDEFINED’;
    BEGIN
    WHILE (job_state != ‘COMPLETED’) AND (job_state != ‘STOPPED’) LOOP
    status := dbms_datapump.get_status(
    handle => h1,
    mask => dbms_datapump.ku$_status_job_error +
    dbms_datapump.ku$_status_job_status +
    dbms_datapump.ku$_status_wip,
    timeout => -1);
    job_state := status.job_status.state;
    dbms_lock.sleep(10);
    END LOOP;
    EXCEPTION WHEN job_not_exist THEN
    dbms_output.put_line(‘job finished’);
    END;
    COMMIT;
    EXCEPTION WHEN OTHERS THEN
    ROLLBACK;
    RAISE;
    END;
    /

    — Set up queue “TADMIN”.”ORCL$APPQ”

    BEGIN
    dbms_streams_adm.set_up_queue(
    queue_table => ‘”TADMIN”.”ORCL$APPQT”‘,
    storage_clause => NULL,
    queue_name => ‘”TADMIN”.”ORCL$APPQ”‘,
    queue_user => ”);
    END;
    /

    — APPLY changes for schema T

    DECLARE
    compat VARCHAR2(512);
    initpos NUMBER;
    pos NUMBER;
    version_num NUMBER;
    release_num NUMBER;
    compat_func VARCHAR2(65);
    get_compatible VARCHAR2(4000);
    BEGIN
    SELECT value INTO compat
    FROM v$parameter
    WHERE name = ‘compatible’;

    — Extract version number
    initpos := 1;
    pos := INSTR(compat, ‘.’, initpos, 1);
    IF pos > 0 THEN
    version_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
    initpos := pos + 1;

    — Extract release number
    pos := INSTR(compat, ‘.’, initpos, 1);
    IF pos > 0 THEN
    release_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
    initpos := pos + 1;
    ELSE
    release_num := TO_NUMBER(SUBSTR(compat, initpos));
    END IF;
    END IF;

    IF version_num < 10 THEN
    compat_func := ‘dbms_streams.compatible_9_2’;
    ELSIF version_num = 10 THEN
    IF release_num < 2 THEN
    compat_func := ‘dbms_streams.compatible_10_1’;
    ELSE
    compat_func := ‘dbms_streams.compatible_10_2’;
    END IF;
    ELSE
    compat_func := ‘dbms_streams.compatible_10_2’;
    END IF;

    get_compatible := ‘:lcr.get_compatible() ‘”T”‘,
    streams_type => ‘APPLY’,
    streams_name => ”,
    queue_name => ‘”TADMIN”.”ORCL$APPQ”‘,
    include_dml => TRUE,
    include_ddl => TRUE,
    include_tagged_lcr => TRUE,
    source_database => ‘ORCL’,
    inclusion_rule => TRUE,
    and_condition => get_compatible);
    END;
    /

    — Get tag value to be used for Apply

    DECLARE
    found BINARY_INTEGER := 0;
    tag_num NUMBER;
    apply_nm VARCHAR2(30);
    apply_nm_dqt VARCHAR2(32);
    BEGIN
    SELECT apply_name INTO apply_nm
    FROM dba_apply_progress
    WHERE source_database = ‘ORCL’;

    apply_nm_dqt := ‘”‘ || apply_nm || ‘”‘;
    — Use the apply object id as the tag
    SELECT o.object_id INTO tag_num
    FROM dba_objects o
    WHERE o.object_name= apply_nm AND
    o.object_type=’APPLY’;
    LOOP
    BEGIN
    found := 0;
    SELECT 1 INTO found FROM dba_apply
    WHERE apply_name != apply_nm AND
    apply_tag = hextoraw(tag_num);
    EXCEPTION WHEN no_data_found THEN
    EXIT;
    END;
    EXIT WHEN (found = 0);
    tag_num := tag_num + 1;
    END LOOP;
    — alter apply
    dbms_apply_adm.alter_apply(
    apply_name => apply_nm_dqt,
    apply_tag => hextoraw(tag_num));
    END;
    /

    — Start apply process applying changes from ORCL

    DECLARE
    apply_nm VARCHAR2(32);
    apply_nm_dqt VARCHAR2(32);
    BEGIN
    SELECT apply_name INTO apply_nm
    FROM dba_apply_progress
    WHERE source_database = ‘ORCL’;

    apply_nm_dqt := ‘”‘ || apply_nm || ‘”‘;
    dbms_apply_adm.start_apply(
    apply_name => apply_nm_dqt);
    EXCEPTION WHEN OTHERS THEN
    IF sqlcode = -26666 THEN NULL; — APPLY process already running
    ELSE RAISE;
    END IF;
    END;
    /
    — connect as streams administrator to site 1
    PROMPT Connecting as streams administrator to site 1
    CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1

    — Enable propagation schedule for “TADMIN”.”ORCL$CAPQ”
    — to ORCL1

    DECLARE
    q2q VARCHAR2(10);
    destn_q VARCHAR2(65);
    BEGIN
    SELECT queue_to_queue INTO q2q
    FROM dba_propagation
    WHERE source_queue_owner = ‘TADMIN’ AND
    source_queue_name = ‘ORCL$CAPQ’ AND
    destination_queue_owner = ‘TADMIN’ AND
    destination_queue_name = ‘ORCL$APPQ’ AND
    destination_dblink = ‘ORCL1’;

    IF q2q = ‘TRUE’ THEN
    destn_q := ‘”TADMIN”.”ORCL$APPQ”‘;
    ELSE
    destn_q := NULL;
    END IF;

    dbms_aqadm.enable_propagation_schedule(
    queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
    destination => ‘ORCL1’,
    destination_queue => destn_q);
    EXCEPTION WHEN OTHERS THEN
    IF sqlcode = -24064 THEN NULL; — propagation already enabled
    ELSE RAISE;
    END IF;
    END;
    /

  2. charles says:

    Hi .
    I am trying to RUN Oracle streams via oracle gateway on a SQL server (DG4MSQL), the code is as below.

    The gateway is okay and i can query data across.

    connect strmadmin/strmadmin

    /* Queue */

    BEGIN
    DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table =>’sqlserver_queue_table’,
    queue_name=>’sqlserver_queue’);
    END;

    /*createa capture process for sqlserver*/

    BEGIN
    DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name=>’scott.charlessql’,
    streams_type=>’capture’,
    streams_name=>’sqlserver_capture’,
    queue_name=>’sqlserver_queue’,
    include_dml=>true,
    include_ddl=>false,
    include_tagged_lcr=>false);
    END;

    /*INSERT FUNCTION FOR DML HANDLER*/
    create or replace
    PROCEDURE insert_reg IS
    CURSOR c1 IS
    SELECT test1, test2 FROM scott.charlessql;
    c1_rec c1 % ROWTYPE;
    scn NUMBER;
    BEGIN
    scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
    DBMS_FLASHBACK.ENABLE_AT_SYSTEM_CHANGE_NUMBER(
    query_scn => scn);
    /* Open c1 in flashback mode */
    OPEN c1;
    /* Disable Flashback */
    DBMS_FLASHBACK.DISABLE;
    LOOP
    FETCH c1 INTO c1_rec;
    EXIT WHEN c1%NOTFOUND;
    /*
    Note that all the DML operations inside the loop are performed
    with Flashback disabled
    */
    INSERT INTO charlessql@dg4msql VALUES (
    c1_rec.test1,
    c1_rec.test2);
    END LOOP;
    COMMIT;
    DBMS_OUTPUT.PUT_LINE(‘SCN = ‘ || scn);
    EXCEPTION WHEN OTHERS THEN
    DBMS_FLASHBACK.DISABLE;
    RAISE;
    END;
    /

    /*OPTIONAL SCN CHECK*/

    DECLARE
    iscn NUMBER; –Variable to holdinstantiationSCNvalue
    BEGIN
    iscn:= DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
    DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
    source_object_name=>’scott.charlessql’,
    source_database_name=> ‘TEST1’,
    instantiation_scn=>iscn,
    apply_database_link => ‘dg4msql.serfca.org’);
    END;

    /*APPLY PROCESS*/

    BEGIN
    DBMS_APPLY_ADM.CREATE_APPLY(
    queue_name => ‘sqlserver_queue’,
    apply_name => ‘apply_stream_sql’,
    message_handler => null,
    apply_database_link => ‘dg4msql.serfca.org’,
    apply_captured => TRUE ,
    source_database => ‘TEST1’);
    END;

    /*DML HANDLER PROCESS*/

    BEGIN
    DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name => ‘scott.charlessql’,
    object_type => ‘TABLE’ ,
    operation_name => ‘INSERT ‘,
    error_handler => false,
    user_procedure => ‘strmadmin.insert_reg’,
    apply_database_link => ‘dg4msql.serfca.org’,
    apply_name => ‘apply_stream_sql’);
    END;

    Begin
    sys.DBMS_APPLY_ADM.START_APPLY(
    apply_name=>’apply_stream_sql’);
    END;

    BEGINDBMS_CAPTURE_ADM.START_CAPTURE(
    capture_name=> ‘sqlserver_capture’);
    END;

    The problem i have is that, i can see the ca[pture getting populated, but the apply is not doing it job, i cannot see the procedure fire to insert records from the oracle side to the SQl server side.

    There are no rules defined.

    Can you let me know where i am going wrong, with this..

  3. neworacledba says:

    this is a great post on streams

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: