db version: 10.2.0.4
test platform: windows xp
source db: orcl
dest db: orcl1
–default all execute on source database except specified
SQL> show parameter compati
NAME TYPE VALUE
———————————— ———– ————
compatible string 10.2.0.3.0
sga_max_size big integer 276M
sga_target big integer 276M
processes integer 150
timed_statistics boolean TRUE
log_archive_dest_1 string LOCATION=D:\oracle\oradata\orcl\archive
log_archive_format string ORCL_ARC%S_%R.%T
undo_retention integer 7200
remote_archive_enable string TRUE
global_names boolean TRUE
open_links integer 10
job_queue_processes integer 10
parallel_max_servers integer 20
streams_pool_size big integer 44M
–enable database supplemental logging
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE) COLUMNS;
Database altered.
–create users for test
SQL> create user t identified by t default tablespace users temporary tablespace temp;
User created.
SQL> grant dba to t;
Grant succeeded.
conn t/t
SQL> create table mystream (id number, name varchar(100));
Table created.
SQL> insert into mystream values(1,’a');
1 row created.
SQL> commit;
Commit complete.
–dest db
create user t identified by t default tablespace users temporary tablespace temp;
grant dba to t;
–create database link
CREATE public DATABASE LINK orcl1 CONNECT TO tadmin identified by tadmin USING ‘ORCL1′;
–source and dest db
–tadmin is the stream administrator user, MUST be different from SYS and SYSTEM
CREATE USER tadmin IDENTIFIED BY tadmin DEFAULT TABLESPACE USERS TEMPORARY TABLESPACE TEMP;
GRANT CONNECT, RESOURCE, DBA TO tadmin;
exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(grantee => ‘TADMIN’, grant_privileges => TRUE);
create directory REORG_DIR as ‘D:\oracle\reorg’; –souce db
create directory REORG_DIR as ‘C:\oracle\reorg’; –dest db
–source db
conn tadmin/tadmin
begin
DBMS_STREAMS_ADM.MAINTAIN_SCHEMAS(
schema_names => ‘T’,
source_database => ‘ORCL’,
source_directory_object => ‘REORG_DIR’,
destination_directory_object => ‘REORG_DIR’,
destination_database => ‘ORCL1′,
perform_actions => TRUE
);
end;
/
–source db
T@orcl>select * from mystream;
ID NAME
———- ——————–
1 a
–check dest db
SQL> conn t/t
Connected.
SQL> select * from mystream;
ID NAME
———- ——————–
1 a
–add data to source table
T@orcl>insert into mystream values(2,’b');
1 row created.
T@orcl>insert into mystream values(3,’c');
1 row created.
T@orcl>commit;
Commit complete.
T@orcl>select * from mystream;
ID NAME
———- ——————–
1 a
2 b
3 c
–check dest db
SQL> select * from mystream;
ID NAME
———- ——————–
1 a
2 b
3 c
–add many data to source db
T@orcl>insert into mystream select object_id,object_name from dba_objects;
46767 rows created.
T@orcl>commit;
Commit complete.
T@orcl>select count(*) from mystream;
COUNT(*)
———-
46770
–check dest db
SQL> select count(*) from mystream;
COUNT(*)
———-
3 –it is still 3
–wait 3+ minutes and check again. It is slow.
SQL> select count(*) from mystream;
COUNT(*)
———-
46770
–let’s recreate the stream with ddl enable
–both source and dest db
conn / as sysdba
exec dbms_streams_adm.remove_streams_configuration();
drop user tadmin cascade;
CREATE USER tadmin IDENTIFIED BY tadmin DEFAULT TABLESPACE USERS TEMPORARY TABLESPACE TEMP;
GRANT CONNECT, RESOURCE, DBA TO tadmin;
exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(grantee => ‘TADMIN’, grant_privileges => TRUE);
–dest db
drop user t cascade;
–source db
truncate table t.mystream;
–recreate with ddl enable, source db
CONN tadmin/tadmin
begin
DBMS_STREAMS_ADM.MAINTAIN_SCHEMAS(
schema_names => ‘T’,
source_database => ‘ORCL’,
source_directory_object => ‘REORG_DIR’,
destination_directory_object => ‘REORG_DIR’,
destination_database => ‘ORCL1′,
perform_actions => TRUE,
include_ddl => TRUE);
end;
/
–add data
conn t/t
insert into mystream values (1,’ddl’);
commit;
–check dest
SQL> select * from mystream;
ID NAME
———- ——————–
1 ddl
–dml is ok, let’s do ddl test
–source db, create table
T@orcl>create table mystream2 as select * from dba_objects where 1=0;
Table created.
–check dest db
SQL> select table_name from user_tables;
TABLE_NAME
——————————
MYSTREAM
MYSTREAM2
–mystream2 is there. DDL is ok too.
–let’s test tons of data insert
T@orcl>insert into mystream2 select * from dba_objects;
46807 rows created.
T@orcl>insert into mystream2 select * from mystream2;
46807 rows created.
T@orcl>/
93614 rows created.
T@orcl>commit;
Commit complete.
T@orcl>insert into mystream2 select * from mystream2;
187228 rows created.
T@orcl>commit;
Commit complete.
–wait n minutes, check dest db
SQL> select count(*) from mystream2;
COUNT(*)
———-
0 –it still ZERO!
–check source db alert log, get this
Wed Oct 29 15:47:38 2008
Error 600 occured while spilling buffered messages
Wed Oct 29 15:47:38 2008
Errors in file d:\oracle\admin\orcl\bdump\orcl_q004_5340.trc:
ORA-00600: internal error code, arguments: [kwqbmcrcpts101], [], [], [], [], [], [], []
–according to metalink Note:435741.1, this bug is fixed in 11g
–also get low SGA error
Wed Oct 29 15:47:59 2008
Propagation Schedule for (TADMIN.ORCL$CAPQ, “TADMIN”.”ORCL$APPQ”@ORCL1) encountered following error:
ORA-23603: STREAMS enqueue aborted due to low SGA
this is my sga size:
sga_max_size big integer 276M
sga_target big integer 276M
My impress so far, stream is not stable or efficient enough in 10gr2.
Filed under: Uncategorized | Tagged: scratch, stream
–We can use the procedure DBMS_STREAMS_ADM.MAINTAIN_SCHEMAS to generate scripts. Here is an example.
SET ECHO ON
SET VERIFY OFF
WHENEVER SQLERROR EXIT SQL.SQLCODE;
——————————————————————-
– get TNSNAME and streams admin user details for both the databases
——————————————————————–
PROMPT
PROMPT ‘Enter TNS Name of site 1 as parameter 1:’
DEFINE db1 = &1
PROMPT
PROMPT ‘Enter streams admin username for site 1 as parameter 2:’
DEFINE strm_adm_db1 = &2
PROMPT
PROMPT ‘Enter streams admin password for site 1 as parameter 3:’
DEFINE strm_adm_pwd_db1 = &3
PROMPT
PROMPT ‘Enter TNS Name of site 2 as parameter 4:’
DEFINE db2 = &4
PROMPT
PROMPT ‘Enter streams admin username for site 2 as parameter 5:’
DEFINE strm_adm_db2 = &5
PROMPT
PROMPT ‘Enter streams admin password for site 2 as parameter 6:’
DEFINE strm_adm_pwd_db2 = &6
– connect as streams administrator to site 1
PROMPT Connecting as streams administrator to site 1
CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1
–
– Add supplemental log group for table “T”.”MYSTREAM”
–
BEGIN
EXECUTE IMMEDIATE ‘ALTER TABLE “T”.”MYSTREAM” ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, FOREIGN KEY, UNIQUE INDEX) COLUMNS’;
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -32588 THEN NULL; — Logging attribute exists
ELSE RAISE;
END IF;
END;
/
–
– Add supplemental log group for table “T”.”T”
–
BEGIN
EXECUTE IMMEDIATE ‘ALTER TABLE “T”.”T” ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, FOREIGN KEY, UNIQUE INDEX) COLUMNS’;
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -32588 THEN NULL; — Logging attribute exists
ELSE RAISE;
END IF;
END;
/
–
– Set up queue “TADMIN”.”ORCL$CAPQ”
–
BEGIN
dbms_streams_adm.set_up_queue(
queue_table => ‘”TADMIN”.”ORCL$CAPQT”‘,
storage_clause => NULL,
queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
queue_user => ”);
END;
/
–
– PROPAGATE changes for schema T
–
DECLARE
version_num NUMBER := 0;
release_num NUMBER := 0;
pos NUMBER;
initpos NUMBER;
q2q BOOLEAN;
stmt VARCHAR2(100);
ver VARCHAR2(30);
compat VARCHAR2(30);
BEGIN
BEGIN
stmt := ‘BEGIN dbms_utility.db_version@ORCL1(:ver, :compat); END;’;
EXECUTE IMMEDIATE stmt USING OUT ver, OUT compat;
— Extract version number
initpos := 1;
pos := INSTR(compat, ‘.’, initpos, 1);
IF pos > 0 THEN
version_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
initpos := pos + 1;
— Extract release number
pos := INSTR(compat, ‘.’, initpos, 1);
IF pos > 0 THEN
release_num := TO_NUMBER(SUBSTR(compat, initpos,
pos – initpos));
initpos := pos + 1;
ELSE
release_num := TO_NUMBER(SUBSTR(compat, initpos));
END IF;
ELSE
version_num := TO_NUMBER(SUBSTR(compat, initpos));
END IF;
— use q2q propagation if compatibility >= 10.2
IF version_num > 10 OR
(version_num = 10 AND release_num >=2) THEN
q2q := TRUE;
ELSE
q2q := FALSE;
END IF;
EXCEPTION WHEN OTHERS THEN
q2q := FALSE;
END;
dbms_streams_adm.add_schema_propagation_rules(
schema_name => ‘”T”‘,
streams_name => ”,
source_queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
destination_queue_name => ‘”TADMIN”.”ORCL$APPQ”@ORCL1′,
include_dml => TRUE,
include_ddl => TRUE,
include_tagged_lcr => TRUE,
source_database => ‘ORCL’,
inclusion_rule => TRUE,
and_condition => NULL,
queue_to_queue => q2q);
END;
/
–
– Disable propagation. Enable after destination has been setup
–
DECLARE
q2q VARCHAR2(10);
destn_q VARCHAR2(65);
BEGIN
SELECT queue_to_queue INTO q2q
FROM dba_propagation
WHERE source_queue_owner = ‘TADMIN’ AND
source_queue_name = ‘ORCL$CAPQ’ AND
destination_queue_owner = ‘TADMIN’ AND
destination_queue_name = ‘ORCL$APPQ’ AND
destination_dblink = ‘ORCL1′;
IF q2q = ‘TRUE’ THEN
destn_q := ‘”TADMIN”.”ORCL$APPQ”‘;
ELSE
destn_q := NULL;
END IF;
dbms_aqadm.disable_propagation_schedule(
queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
destination => ‘ORCL1′,
destination_queue => destn_q);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -24065 THEN NULL; — propagation already disabled
ELSE RAISE;
END IF;
END;
/
–
– CAPTURE changes for schema T
–
DECLARE
compat VARCHAR2(512);
initpos NUMBER;
pos NUMBER;
version_num NUMBER;
release_num NUMBER;
compat_func VARCHAR2(65);
get_compatible VARCHAR2(4000);
BEGIN
SELECT value INTO compat
FROM v$parameter
WHERE name = ‘compatible’;
— Extract version number
initpos := 1;
pos := INSTR(compat, ‘.’, initpos, 1);
IF pos > 0 THEN
version_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
initpos := pos + 1;
— Extract release number
pos := INSTR(compat, ‘.’, initpos, 1);
IF pos > 0 THEN
release_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
initpos := pos + 1;
ELSE
release_num := TO_NUMBER(SUBSTR(compat, initpos));
END IF;
END IF;
IF version_num < 10 THEN
compat_func := ‘dbms_streams.compatible_9_2′;
ELSIF version_num = 10 THEN
IF release_num < 2 THEN
compat_func := ‘dbms_streams.compatible_10_1′;
ELSE
compat_func := ‘dbms_streams.compatible_10_2′;
END IF;
ELSE
compat_func := ‘dbms_streams.compatible_10_2′;
END IF;
get_compatible := ‘:lcr.get_compatible() ‘”T”‘,
streams_type => ‘CAPTURE’,
streams_name => ‘”ORCL$CAP”‘,
queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
include_dml => TRUE,
include_ddl => TRUE,
include_tagged_lcr => TRUE,
source_database => ‘ORCL’,
inclusion_rule => TRUE,
and_condition => get_compatible);
END;
/
–
– Datapump SCHEMA MODE EXPORT
–
DECLARE
h1 NUMBER; — data pump job handle
schema_expr_list VARCHAR2(32767); — for metadata_filter
cnt NUMBER; — temp variable
object_owner dbms_utility.uncl_array; — obj owners
job_state VARCHAR2(30); — job state
status ku$_Status; — data pump status
job_not_exist exception;
pragma exception_init(job_not_exist, -31626);
BEGIN
object_owner(1) := ‘T’;
FOR idx IN 1..1 LOOP
— schema does not exist locally, need instantiation
IF schema_expr_list IS NULL THEN
schema_expr_list := ‘(‘;
ELSE
schema_expr_list := schema_expr_list ||’,';
END IF;
schema_expr_list := schema_expr_list||””||object_owner(idx)||””;
END LOOP;
IF schema_expr_list IS NOT NULL THEN
schema_expr_list := schema_expr_list || ‘)’;
ELSE
COMMIT;
RETURN;
END IF;
h1 := dbms_datapump.open(operation=>’EXPORT’,job_mode=>’SCHEMA’,
remote_link=>”,
job_name=>NULL, version=>’COMPATIBLE’);
dbms_datapump.metadata_filter(
handle=>h1,
name=>’SCHEMA_EXPR’,
value=>’IN’||schema_expr_list);
dbms_datapump.add_file(
handle=>h1,
filename=>’expdat26.dmp’,
directory=>’REORG_DIR’,
filetype=>dbms_datapump.ku$_file_type_dump_file);
dbms_datapump.add_file(
handle=>h1,
filename=>’expdat26.dlg’,
directory=>’REORG_DIR’,
filetype=>dbms_datapump.ku$_file_type_log_file);
dbms_datapump.start_job(h1);
job_state := ‘UNDEFINED’;
BEGIN
WHILE (job_state != ‘COMPLETED’) AND (job_state != ‘STOPPED’) LOOP
status := dbms_datapump.get_status(
handle => h1,
mask => dbms_datapump.ku$_status_job_error +
dbms_datapump.ku$_status_job_status +
dbms_datapump.ku$_status_wip,
timeout => -1);
job_state := status.job_status.state;
dbms_lock.sleep(10);
END LOOP;
EXCEPTION WHEN job_not_exist THEN
dbms_output.put_line(‘job finished’);
END;
— Transfer dump file to the destination directory
dbms_file_transfer.put_file(
source_directory_object => ‘”REORG_DIR”‘,
source_file_name => ‘expdat26.dmp’,
destination_directory_object => ‘”REORG_DIR”‘,
destination_file_name => ‘expdat26.dmp’,
destination_database => ‘ORCL1′);
COMMIT;
EXCEPTION WHEN OTHERS THEN
ROLLBACK;
RAISE;
END;
/
–
– Start capture process ORCL$CAP
–
BEGIN
dbms_capture_adm.start_capture(
capture_name => ‘”ORCL$CAP”‘);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -26666 THEN NULL; — CAPTURE process already running
ELSE RAISE;
END IF;
END;
/
– connect as streams administrator to site 2
PROMPT Connecting as streams administrator to site 2
CONNECT &strm_adm_db2/&strm_adm_pwd_db2@&db2
–
– Datapump SCHEMA MODE IMPORT
–
DECLARE
h1 NUMBER; — data pump job handle
schema_expr_list VARCHAR2(32767); — for metadata_filter
cnt NUMBER; — temp variable
object_owner dbms_utility.uncl_array; — obj owners
job_state VARCHAR2(30); — job state
status ku$_Status; — data pump status
job_not_exist exception;
pragma exception_init(job_not_exist, -31626);
BEGIN
object_owner(1) := ‘T’;
FOR idx IN 1..1 LOOP
— schema does not exist locally, need instantiation
IF schema_expr_list IS NULL THEN
schema_expr_list := ‘(‘;
ELSE
schema_expr_list := schema_expr_list ||’,';
END IF;
schema_expr_list := schema_expr_list||””||object_owner(idx)||””;
END LOOP;
IF schema_expr_list IS NOT NULL THEN
schema_expr_list := schema_expr_list || ‘)’;
ELSE
COMMIT;
RETURN;
END IF;
h1 := dbms_datapump.open(operation=>’IMPORT’,job_mode=>’SCHEMA’,
remote_link=>”,
job_name=>NULL, version=>’COMPATIBLE’);
dbms_datapump.add_file(
handle=>h1,
filename=>’expdat26.dmp’,
directory=>’REORG_DIR’,
filetype=>dbms_datapump.ku$_file_type_dump_file);
dbms_datapump.add_file(
handle=>h1,
filename=>’expdat26.dlg’,
directory=>’REORG_DIR’,
filetype=>dbms_datapump.ku$_file_type_log_file);
dbms_datapump.start_job(h1);
job_state := ‘UNDEFINED’;
BEGIN
WHILE (job_state != ‘COMPLETED’) AND (job_state != ‘STOPPED’) LOOP
status := dbms_datapump.get_status(
handle => h1,
mask => dbms_datapump.ku$_status_job_error +
dbms_datapump.ku$_status_job_status +
dbms_datapump.ku$_status_wip,
timeout => -1);
job_state := status.job_status.state;
dbms_lock.sleep(10);
END LOOP;
EXCEPTION WHEN job_not_exist THEN
dbms_output.put_line(‘job finished’);
END;
COMMIT;
EXCEPTION WHEN OTHERS THEN
ROLLBACK;
RAISE;
END;
/
–
– Set up queue “TADMIN”.”ORCL$APPQ”
–
BEGIN
dbms_streams_adm.set_up_queue(
queue_table => ‘”TADMIN”.”ORCL$APPQT”‘,
storage_clause => NULL,
queue_name => ‘”TADMIN”.”ORCL$APPQ”‘,
queue_user => ”);
END;
/
–
– APPLY changes for schema T
–
DECLARE
compat VARCHAR2(512);
initpos NUMBER;
pos NUMBER;
version_num NUMBER;
release_num NUMBER;
compat_func VARCHAR2(65);
get_compatible VARCHAR2(4000);
BEGIN
SELECT value INTO compat
FROM v$parameter
WHERE name = ‘compatible’;
— Extract version number
initpos := 1;
pos := INSTR(compat, ‘.’, initpos, 1);
IF pos > 0 THEN
version_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
initpos := pos + 1;
— Extract release number
pos := INSTR(compat, ‘.’, initpos, 1);
IF pos > 0 THEN
release_num := TO_NUMBER(SUBSTR(compat, initpos, pos – initpos));
initpos := pos + 1;
ELSE
release_num := TO_NUMBER(SUBSTR(compat, initpos));
END IF;
END IF;
IF version_num < 10 THEN
compat_func := ‘dbms_streams.compatible_9_2′;
ELSIF version_num = 10 THEN
IF release_num < 2 THEN
compat_func := ‘dbms_streams.compatible_10_1′;
ELSE
compat_func := ‘dbms_streams.compatible_10_2′;
END IF;
ELSE
compat_func := ‘dbms_streams.compatible_10_2′;
END IF;
get_compatible := ‘:lcr.get_compatible() ‘”T”‘,
streams_type => ‘APPLY’,
streams_name => ”,
queue_name => ‘”TADMIN”.”ORCL$APPQ”‘,
include_dml => TRUE,
include_ddl => TRUE,
include_tagged_lcr => TRUE,
source_database => ‘ORCL’,
inclusion_rule => TRUE,
and_condition => get_compatible);
END;
/
–
– Get tag value to be used for Apply
–
DECLARE
found BINARY_INTEGER := 0;
tag_num NUMBER;
apply_nm VARCHAR2(30);
apply_nm_dqt VARCHAR2(32);
BEGIN
SELECT apply_name INTO apply_nm
FROM dba_apply_progress
WHERE source_database = ‘ORCL’;
apply_nm_dqt := ‘”‘ || apply_nm || ‘”‘;
— Use the apply object id as the tag
SELECT o.object_id INTO tag_num
FROM dba_objects o
WHERE o.object_name= apply_nm AND
o.object_type=’APPLY’;
LOOP
BEGIN
found := 0;
SELECT 1 INTO found FROM dba_apply
WHERE apply_name != apply_nm AND
apply_tag = hextoraw(tag_num);
EXCEPTION WHEN no_data_found THEN
EXIT;
END;
EXIT WHEN (found = 0);
tag_num := tag_num + 1;
END LOOP;
— alter apply
dbms_apply_adm.alter_apply(
apply_name => apply_nm_dqt,
apply_tag => hextoraw(tag_num));
END;
/
–
– Start apply process applying changes from ORCL
–
DECLARE
apply_nm VARCHAR2(32);
apply_nm_dqt VARCHAR2(32);
BEGIN
SELECT apply_name INTO apply_nm
FROM dba_apply_progress
WHERE source_database = ‘ORCL’;
apply_nm_dqt := ‘”‘ || apply_nm || ‘”‘;
dbms_apply_adm.start_apply(
apply_name => apply_nm_dqt);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -26666 THEN NULL; — APPLY process already running
ELSE RAISE;
END IF;
END;
/
– connect as streams administrator to site 1
PROMPT Connecting as streams administrator to site 1
CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1
–
– Enable propagation schedule for “TADMIN”.”ORCL$CAPQ”
– to ORCL1
–
DECLARE
q2q VARCHAR2(10);
destn_q VARCHAR2(65);
BEGIN
SELECT queue_to_queue INTO q2q
FROM dba_propagation
WHERE source_queue_owner = ‘TADMIN’ AND
source_queue_name = ‘ORCL$CAPQ’ AND
destination_queue_owner = ‘TADMIN’ AND
destination_queue_name = ‘ORCL$APPQ’ AND
destination_dblink = ‘ORCL1′;
IF q2q = ‘TRUE’ THEN
destn_q := ‘”TADMIN”.”ORCL$APPQ”‘;
ELSE
destn_q := NULL;
END IF;
dbms_aqadm.enable_propagation_schedule(
queue_name => ‘”TADMIN”.”ORCL$CAPQ”‘,
destination => ‘ORCL1′,
destination_queue => destn_q);
EXCEPTION WHEN OTHERS THEN
IF sqlcode = -24064 THEN NULL; — propagation already enabled
ELSE RAISE;
END IF;
END;
/