pgq的实际应用案例, 在线增量复制的实施案例. 创建源库 ``` postgres=# create database src; CREATE DATABASE ``` 创建目标库 ``` postgres=# create database dest; CREATE DATABASE ``` 连接到源库 ``` \c src ``` 创建测试表 组1, 这两个表有外键关联, 在一个事务中操作, 在事务中的所有表的跟踪记录必须插入同一个记录表. ``` create table grp1_tbl1 (id int8 primary key, info text, crt_time timestamp); create table grp1_tbl2 ( id int8 primary key, tbl1_id int8 REFERENCES grp1_tbl1(id) DEFERRABLE INITIALLY DEFERRED, info text, crt_time timestamp ); ``` 组2, 这两个表有外键关联, 在一个事务中操作, 在事务中的所有表的跟踪记录必须插入同一个记录表. ``` create table grp2_tbl1 (id int8 primary key, info text, crt_time timestamp); create table grp2_tbl2 ( id int8 primary key, tbl1_id int8 REFERENCES grp2_tbl1(id) DEFERRABLE INITIALLY DEFERRED, info text, crt_time timestamp ); ``` 组3, 这两个表有外键关联, 在一个事务中操作, 在事务中的所有表的跟踪记录必须插入同一个记录表. ``` create table grp3_tbl1 (id int8 primary key, info text, crt_time timestamp); create table grp3_tbl2 ( id int8 primary key, tbl1_id int8 REFERENCES grp3_tbl1(id) DEFERRABLE INITIALLY DEFERRED, info text, crt_time timestamp ); ``` 创建pgbench测试脚本, 三组测试表分别使用3个事务, 每个事务包含对组内2个表的更新或插入操作2次, 删除操作1次. ``` vi test.sql \setrandom grp1_tbl1_id 1 1000000 \setrandom grp1_tbl2_id 1 2000000 \setrandom grp2_tbl1_id 1 1000000 \setrandom grp2_tbl2_id 1 2000000 \setrandom grp3_tbl1_id 1 1000000 \setrandom grp3_tbl2_id 1 2000000 begin; insert into grp1_tbl1 (id,info,crt_time) values (:grp1_tbl1_id, md5(random()::text), now()) on conflict ON CONSTRAINT grp1_tbl1_pkey do update set info=excluded.info,crt_time=excluded.crt_time; insert into grp1_tbl2 (id,tbl1_id,info,crt_time) values (:grp1_tbl2_id, :grp1_tbl1_id, md5(random()::text), now()) on conflict ON CONSTRAINT grp1_tbl2_pkey do update set info=excluded.info,crt_time=excluded.crt_time; insert into grp1_tbl1 (id,info,crt_time) values (:grp1_tbl1_id+1, md5(random()::text), now()) on conflict ON CONSTRAINT grp1_tbl1_pkey do update set info=excluded.info,crt_time=excluded.crt_time; insert into grp1_tbl2 (id,tbl1_id,info,crt_time) values (:grp1_tbl2_id+1, :grp1_tbl1_id+1, md5(random()::text), now()) on conflict ON CONSTRAINT grp1_tbl2_pkey do update set info=excluded.info,crt_time=excluded.crt_time; delete from grp1_tbl2 where id = (:grp1_tbl2_id+100); end; begin; insert into grp2_tbl1 (id,info,crt_time) values (:grp2_tbl1_id, md5(random()::text), now()) on conflict ON CONSTRAINT grp2_tbl1_pkey do update set info=excluded.info,crt_time=excluded.crt_time; insert into grp2_tbl2 (id,tbl1_id,info,crt_time) values (:grp2_tbl2_id, :grp2_tbl1_id, md5(random()::text), now()) on conflict ON CONSTRAINT grp2_tbl2_pkey do update set info=excluded.info,crt_time=excluded.crt_time; insert into grp2_tbl1 (id,info,crt_time) values (:grp2_tbl1_id+1, md5(random()::text), now()) on conflict ON CONSTRAINT grp2_tbl1_pkey do update set info=excluded.info,crt_time=excluded.crt_time; insert into grp2_tbl2 (id,tbl1_id,info,crt_time) values (:grp2_tbl2_id+1, :grp2_tbl1_id+1, md5(random()::text), now()) on conflict ON CONSTRAINT grp2_tbl2_pkey do update set info=excluded.info,crt_time=excluded.crt_time; delete from grp2_tbl2 where id = (:grp2_tbl2_id+100); end; begin; insert into grp3_tbl1 (id,info,crt_time) values (:grp3_tbl1_id, md5(random()::text), now()) on conflict ON CONSTRAINT grp3_tbl1_pkey do update set info=excluded.info,crt_time=excluded.crt_time; insert into grp3_tbl2 (id,tbl1_id,info,crt_time) values (:grp3_tbl2_id, :grp3_tbl1_id, md5(random()::text), now()) on conflict ON CONSTRAINT grp3_tbl2_pkey do update set info=excluded.info,crt_time=excluded.crt_time; insert into grp3_tbl1 (id,info,crt_time) values (:grp3_tbl1_id+1, md5(random()::text), now()) on conflict ON CONSTRAINT grp3_tbl1_pkey do update set info=excluded.info,crt_time=excluded.crt_time; insert into grp3_tbl2 (id,tbl1_id,info,crt_time) values (:grp3_tbl2_id+1, :grp3_tbl1_id+1, md5(random()::text), now()) on conflict ON CONSTRAINT grp3_tbl2_pkey do update set info=excluded.info,crt_time=excluded.crt_time; delete from grp3_tbl2 where id = (:grp3_tbl2_id+100); end; ``` 生成一部分数据 ``` pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 20 src ``` 创建hstore扩展 ``` create extension hstore; ``` 创建mq schema ``` CREATE SCHEMA IF NOT EXISTS mq; ``` 创建获取事务结束时间的函数 ``` create or replace function mq.get_commit_time() returns timestamp without time zone as $$ declare res timestamp without time zone; begin show commit_time.realval into res; return res; exception when others then -- 如果未设置, 则使用以下SQL设置. res := clock_timestamp(); execute 'set local commit_time.realval = '''||res||''''; -- 设置事务级变量 return res; end; $$ language plpgsql; ``` 创建三组跟踪记录表, 实际生产中可以根据需要创建多组记录表, 多组记录表的好处是, 不同的记录表, 在目标端可以并行回放. 在同一个事务要操作多个表的话, 这些表必须的跟踪记录必须记录到同一个记录表, 回放时以达到事务一致性. 第1组跟踪记录表 ``` CREATE TABLE mq.table_change_rec_grp1 ( id serial8 primary key, x_id int8 default txid_current(), -- 事务号 consumed boolean not null default false, -- 是否已消费 relid oid, -- pg_class.oid table_schema name, -- schema name table_name name, -- table name when_tg text, -- after or before level text, -- statement or row op text, -- delete, update, or insert or truncate old_rec hstore, new_rec hstore, crt_time timestamp without time zone not null, -- 时间 dbname name, -- 数据库名 username name, -- 用户名 client_addr inet, -- 客户端地址 client_port int -- 客户端端口 ); create index x_id_table_change_rec_grp1 on mq.table_change_rec_grp1(x_id) where consumed=false; create index crt_time_id_table_change_rec_grp1 on mq.table_change_rec_grp1(crt_time,id) where consumed=false; create table mq.table_change_rec_grp1_0 (like mq.table_change_rec_grp1 including all) inherits(mq.table_change_rec_grp1); create table mq.table_change_rec_grp1_1 (like mq.table_change_rec_grp1 including all) inherits(mq.table_change_rec_grp1); create table mq.table_change_rec_grp1_2 (like mq.table_change_rec_grp1 including all) inherits(mq.table_change_rec_grp1); create table mq.table_change_rec_grp1_3 (like mq.table_change_rec_grp1 including all) inherits(mq.table_change_rec_grp1); create table mq.table_change_rec_grp1_4 (like mq.table_change_rec_grp1 including all) inherits(mq.table_change_rec_grp1); create table mq.table_change_rec_grp1_5 (like mq.table_change_rec_grp1 including all) inherits(mq.table_change_rec_grp1); create table mq.table_change_rec_grp1_6 (like mq.table_change_rec_grp1 including all) inherits(mq.table_change_rec_grp1); ``` 第1组跟踪记录表对应的触发器函数 ``` CREATE OR REPLACE FUNCTION mq.dml_trace_grp1() RETURNS trigger LANGUAGE plpgsql AS $BODY$ DECLARE v_new_rec hstore; v_old_rec hstore; v_username name := session_user; v_dbname name := current_database(); v_client_addr inet := inet_client_addr(); v_client_port int := inet_client_port(); v_crt_time timestamp without time zone := mq.get_commit_time(); v_xid int8 := txid_current(); v_dofweek int := EXTRACT(DOW FROM v_crt_time); BEGIN case TG_OP when 'DELETE' then v_old_rec := hstore(OLD.*); case v_dofweek when 0 then insert into mq.table_change_rec_grp1_0 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 1 then insert into mq.table_change_rec_grp1_1 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 2 then insert into mq.table_change_rec_grp1_2 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 3 then insert into mq.table_change_rec_grp1_3 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 4 then insert into mq.table_change_rec_grp1_4 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 5 then insert into mq.table_change_rec_grp1_5 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 6 then insert into mq.table_change_rec_grp1_6 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); end case; when 'INSERT' then v_new_rec := hstore(NEW.*); case v_dofweek when 0 then insert into mq.table_change_rec_grp1_0 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 1 then insert into mq.table_change_rec_grp1_1 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 2 then insert into mq.table_change_rec_grp1_2 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 3 then insert into mq.table_change_rec_grp1_3 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 4 then insert into mq.table_change_rec_grp1_4 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 5 then insert into mq.table_change_rec_grp1_5 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 6 then insert into mq.table_change_rec_grp1_6 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); end case; when 'UPDATE' then v_old_rec := hstore(OLD.*); v_new_rec := hstore(NEW.*); case v_dofweek when 0 then insert into mq.table_change_rec_grp1_0 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 1 then insert into mq.table_change_rec_grp1_1 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 2 then insert into mq.table_change_rec_grp1_2 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 3 then insert into mq.table_change_rec_grp1_3 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 4 then insert into mq.table_change_rec_grp1_4 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 5 then insert into mq.table_change_rec_grp1_5 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 6 then insert into mq.table_change_rec_grp1_6 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); end case; else RETURN null; end case; RETURN null; END; $BODY$ strict; ``` 第2组跟踪记录表 ``` CREATE TABLE mq.table_change_rec_grp2 ( id serial8 primary key, x_id int8 default txid_current(), -- 事务号 consumed boolean not null default false, -- 是否已消费 relid oid, -- pg_class.oid table_schema name, -- schema name table_name name, -- table name when_tg text, -- after or before level text, -- statement or row op text, -- delete, update, or insert or truncate old_rec hstore, new_rec hstore, crt_time timestamp without time zone not null, -- 时间 dbname name, -- 数据库名 username name, -- 用户名 client_addr inet, -- 客户端地址 client_port int -- 客户端端口 ); create index x_id_table_change_rec_grp2 on mq.table_change_rec_grp2(x_id) where consumed=false; create index crt_time_id_table_change_rec_grp2 on mq.table_change_rec_grp2(crt_time,id) where consumed=false; create table mq.table_change_rec_grp2_0 (like mq.table_change_rec_grp2 including all) inherits(mq.table_change_rec_grp2); create table mq.table_change_rec_grp2_1 (like mq.table_change_rec_grp2 including all) inherits(mq.table_change_rec_grp2); create table mq.table_change_rec_grp2_2 (like mq.table_change_rec_grp2 including all) inherits(mq.table_change_rec_grp2); create table mq.table_change_rec_grp2_3 (like mq.table_change_rec_grp2 including all) inherits(mq.table_change_rec_grp2); create table mq.table_change_rec_grp2_4 (like mq.table_change_rec_grp2 including all) inherits(mq.table_change_rec_grp2); create table mq.table_change_rec_grp2_5 (like mq.table_change_rec_grp2 including all) inherits(mq.table_change_rec_grp2); create table mq.table_change_rec_grp2_6 (like mq.table_change_rec_grp2 including all) inherits(mq.table_change_rec_grp2); ``` 第2组跟踪记录表对应的触发器函数 ``` CREATE OR REPLACE FUNCTION mq.dml_trace_grp2() RETURNS trigger LANGUAGE plpgsql AS $BODY$ DECLARE v_new_rec hstore; v_old_rec hstore; v_username name := session_user; v_dbname name := current_database(); v_client_addr inet := inet_client_addr(); v_client_port int := inet_client_port(); v_crt_time timestamp without time zone := mq.get_commit_time(); v_xid int8 := txid_current(); v_dofweek int := EXTRACT(DOW FROM v_crt_time); BEGIN case TG_OP when 'DELETE' then v_old_rec := hstore(OLD.*); case v_dofweek when 0 then insert into mq.table_change_rec_grp2_0 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 1 then insert into mq.table_change_rec_grp2_1 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 2 then insert into mq.table_change_rec_grp2_2 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 3 then insert into mq.table_change_rec_grp2_3 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 4 then insert into mq.table_change_rec_grp2_4 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 5 then insert into mq.table_change_rec_grp2_5 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 6 then insert into mq.table_change_rec_grp2_6 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); end case; when 'INSERT' then v_new_rec := hstore(NEW.*); case v_dofweek when 0 then insert into mq.table_change_rec_grp2_0 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 1 then insert into mq.table_change_rec_grp2_1 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 2 then insert into mq.table_change_rec_grp2_2 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 3 then insert into mq.table_change_rec_grp2_3 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 4 then insert into mq.table_change_rec_grp2_4 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 5 then insert into mq.table_change_rec_grp2_5 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 6 then insert into mq.table_change_rec_grp2_6 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); end case; when 'UPDATE' then v_old_rec := hstore(OLD.*); v_new_rec := hstore(NEW.*); case v_dofweek when 0 then insert into mq.table_change_rec_grp2_0 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 1 then insert into mq.table_change_rec_grp2_1 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 2 then insert into mq.table_change_rec_grp2_2 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 3 then insert into mq.table_change_rec_grp2_3 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 4 then insert into mq.table_change_rec_grp2_4 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 5 then insert into mq.table_change_rec_grp2_5 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 6 then insert into mq.table_change_rec_grp2_6 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); end case; else RETURN null; end case; RETURN null; END; $BODY$ strict; ``` 第3组跟踪记录表 ``` CREATE TABLE mq.table_change_rec_grp3 ( id serial8 primary key, x_id int8 default txid_current(), -- 事务号 consumed boolean not null default false, -- 是否已消费 relid oid, -- pg_class.oid table_schema name, -- schema name table_name name, -- table name when_tg text, -- after or before level text, -- statement or row op text, -- delete, update, or insert or truncate old_rec hstore, new_rec hstore, crt_time timestamp without time zone not null, -- 时间 dbname name, -- 数据库名 username name, -- 用户名 client_addr inet, -- 客户端地址 client_port int -- 客户端端口 ); create index x_id_table_change_rec_grp3 on mq.table_change_rec_grp3(x_id) where consumed=false; create index crt_time_id_table_change_rec_grp3 on mq.table_change_rec_grp3(crt_time,id) where consumed=false; create table mq.table_change_rec_grp3_0 (like mq.table_change_rec_grp3 including all) inherits(mq.table_change_rec_grp3); create table mq.table_change_rec_grp3_1 (like mq.table_change_rec_grp3 including all) inherits(mq.table_change_rec_grp3); create table mq.table_change_rec_grp3_2 (like mq.table_change_rec_grp3 including all) inherits(mq.table_change_rec_grp3); create table mq.table_change_rec_grp3_3 (like mq.table_change_rec_grp3 including all) inherits(mq.table_change_rec_grp3); create table mq.table_change_rec_grp3_4 (like mq.table_change_rec_grp3 including all) inherits(mq.table_change_rec_grp3); create table mq.table_change_rec_grp3_5 (like mq.table_change_rec_grp3 including all) inherits(mq.table_change_rec_grp3); create table mq.table_change_rec_grp3_6 (like mq.table_change_rec_grp3 including all) inherits(mq.table_change_rec_grp3); ``` 第3组跟踪记录表对应的触发器函数 ``` CREATE OR REPLACE FUNCTION mq.dml_trace_grp3() RETURNS trigger LANGUAGE plpgsql AS $BODY$ DECLARE v_new_rec hstore; v_old_rec hstore; v_username name := session_user; v_dbname name := current_database(); v_client_addr inet := inet_client_addr(); v_client_port int := inet_client_port(); v_crt_time timestamp without time zone := mq.get_commit_time(); v_xid int8 := txid_current(); v_dofweek int := EXTRACT(DOW FROM v_crt_time); BEGIN case TG_OP when 'DELETE' then v_old_rec := hstore(OLD.*); case v_dofweek when 0 then insert into mq.table_change_rec_grp3_0 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 1 then insert into mq.table_change_rec_grp3_1 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 2 then insert into mq.table_change_rec_grp3_2 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 3 then insert into mq.table_change_rec_grp3_3 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 4 then insert into mq.table_change_rec_grp3_4 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 5 then insert into mq.table_change_rec_grp3_5 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 6 then insert into mq.table_change_rec_grp3_6 (relid, table_schema, table_name, when_tg, level, op, old_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); end case; when 'INSERT' then v_new_rec := hstore(NEW.*); case v_dofweek when 0 then insert into mq.table_change_rec_grp3_0 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 1 then insert into mq.table_change_rec_grp3_1 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 2 then insert into mq.table_change_rec_grp3_2 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 3 then insert into mq.table_change_rec_grp3_3 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 4 then insert into mq.table_change_rec_grp3_4 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 5 then insert into mq.table_change_rec_grp3_5 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 6 then insert into mq.table_change_rec_grp3_6 (relid, table_schema, table_name, when_tg, level, op, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); end case; when 'UPDATE' then v_old_rec := hstore(OLD.*); v_new_rec := hstore(NEW.*); case v_dofweek when 0 then insert into mq.table_change_rec_grp3_0 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 1 then insert into mq.table_change_rec_grp3_1 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 2 then insert into mq.table_change_rec_grp3_2 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 3 then insert into mq.table_change_rec_grp3_3 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 4 then insert into mq.table_change_rec_grp3_4 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 5 then insert into mq.table_change_rec_grp3_5 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); when 6 then insert into mq.table_change_rec_grp3_6 (relid, table_schema, table_name, when_tg, level, op, old_rec, new_rec, crt_time, dbname, username, client_addr, client_port) values (tg_relid, tg_table_schema, tg_table_name, tg_when, tg_level, tg_op, v_old_rec, v_new_rec, v_crt_time, v_dbname, v_username, v_client_addr, v_client_port); end case; else RETURN null; end case; RETURN null; END; $BODY$ strict; ``` 为第1组测试表创建触发器函数 ``` CREATE CONSTRAINT TRIGGER tg AFTER INSERT OR DELETE OR UPDATE ON grp1_tbl1 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace_grp1(); CREATE CONSTRAINT TRIGGER tg AFTER INSERT OR DELETE OR UPDATE ON grp1_tbl2 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace_grp1(); ``` 为第2组测试表创建触发器函数 ``` CREATE CONSTRAINT TRIGGER tg AFTER INSERT OR DELETE OR UPDATE ON grp2_tbl1 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace_grp2(); CREATE CONSTRAINT TRIGGER tg AFTER INSERT OR DELETE OR UPDATE ON grp2_tbl2 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace_grp2(); ``` 为第3组测试表创建触发器函数 ``` CREATE CONSTRAINT TRIGGER tg AFTER INSERT OR DELETE OR UPDATE ON grp3_tbl1 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace_grp3(); CREATE CONSTRAINT TRIGGER tg AFTER INSERT OR DELETE OR UPDATE ON grp3_tbl2 DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE mq.dml_trace_grp3(); ``` 创建组1消费函数 ``` create or replace function mq.build_sql_grp1(n int) returns setof text as $$ declare m int := 0; v_table_change_rec_grp1 mq.table_change_rec_grp1; v_tablename name; v_crt_time timestamp without time zone; curs1 refcursor; v_sql text := ''; v_cols text := ''; v_vals text := ''; v_upd_set text := ''; v_upd_del_where text :=''; v_x_id int8; v_max_crt_time timestamp without time zone; begin if n <=0 then -- raise notice 'n must be > 0.'; return; end if; return next 'BEGIN;'; -- 取一个最小的队列表 select tablename,crt_time into v_tablename,v_crt_time from ( select 'table_change_rec_grp1_0' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp1_0 where consumed=false union all select 'table_change_rec_grp1_1' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp1_1 where consumed=false union all select 'table_change_rec_grp1_2' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp1_2 where consumed=false union all select 'table_change_rec_grp1_3' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp1_3 where consumed=false union all select 'table_change_rec_grp1_4' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp1_4 where consumed=false union all select 'table_change_rec_grp1_5' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp1_5 where consumed=false union all select 'table_change_rec_grp1_6' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp1_6 where consumed=false ) t order by crt_time limit 1; case v_tablename when 'table_change_rec_grp1_0' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp1_0 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp1_0 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp1_0 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp1_0 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp1_0 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp1; LOOP if found then -- raise notice '%', v_table_change_rec_grp1; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp1.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_0 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_0 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_0 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp1.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp1; END LOOP; when 'table_change_rec_grp1_1' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp1_1 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp1_1 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp1_1 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp1_1 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp1_1 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp1; LOOP if found then -- raise notice '%', v_table_change_rec_grp1; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp1.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_1 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_1 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_1 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp1.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp1; END LOOP; when 'table_change_rec_grp1_2' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp1_2 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp1_2 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp1_2 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp1_2 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp1_2 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp1; LOOP if found then -- raise notice '%', v_table_change_rec_grp1; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp1.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_2 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_2 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_2 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp1.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp1; END LOOP; when 'table_change_rec_grp1_3' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp1_3 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp1_3 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp1_3 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp1_3 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp1_3 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp1; LOOP if found then -- raise notice '%', v_table_change_rec_grp1; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp1.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_3 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_3 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_3 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp1.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp1; END LOOP; when 'table_change_rec_grp1_4' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp1_4 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp1_4 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp1_4 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp1_4 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp1_4 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp1; LOOP if found then -- raise notice '%', v_table_change_rec_grp1; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp1.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_4 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_4 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_4 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp1.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp1; END LOOP; when 'table_change_rec_grp1_5' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp1_5 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp1_5 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp1_5 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp1_5 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp1_5 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp1; LOOP if found then -- raise notice '%', v_table_change_rec_grp1; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp1.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_5 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_5 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_5 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp1.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp1; END LOOP; when 'table_change_rec_grp1_6' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp1_6 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp1_6 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp1_6 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp1_6 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp1_6 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp1; LOOP if found then -- raise notice '%', v_table_change_rec_grp1; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp1.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_6 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_6 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp1.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp1.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp1.table_schema)||'.'||quote_ident(v_table_change_rec_grp1.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp1_6 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp1.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp1; END LOOP; else -- raise notice 'no % queue table deal code in this function.', v_tablename; return; end case; end; $$ language plpgsql strict ; ``` 创建组2消费函数 ``` create or replace function mq.build_sql_grp2(n int) returns setof text as $$ declare m int := 0; v_table_change_rec_grp2 mq.table_change_rec_grp2; v_tablename name; v_crt_time timestamp without time zone; curs1 refcursor; v_sql text := ''; v_cols text := ''; v_vals text := ''; v_upd_set text := ''; v_upd_del_where text :=''; v_x_id int8 ; v_max_crt_time timestamp without time zone; begin if n <=0 then -- raise notice 'n must be > 0.'; return; end if; return next 'BEGIN;'; -- 取一个最小的队列表 select tablename,crt_time into v_tablename,v_crt_time from ( select 'table_change_rec_grp2_0' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp2_0 where consumed=false union all select 'table_change_rec_grp2_1' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp2_1 where consumed=false union all select 'table_change_rec_grp2_2' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp2_2 where consumed=false union all select 'table_change_rec_grp2_3' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp2_3 where consumed=false union all select 'table_change_rec_grp2_4' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp2_4 where consumed=false union all select 'table_change_rec_grp2_5' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp2_5 where consumed=false union all select 'table_change_rec_grp2_6' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp2_6 where consumed=false ) t order by crt_time limit 1; case v_tablename when 'table_change_rec_grp2_0' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp2_0 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp2_0 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp2_0 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp2_0 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp2_0 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp2; LOOP if found then -- raise notice '%', v_table_change_rec_grp2; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp2.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_0 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_0 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_0 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp2.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp2; END LOOP; when 'table_change_rec_grp2_1' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp2_1 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp2_1 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp2_1 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp2_1 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp2_1 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp2; LOOP if found then -- raise notice '%', v_table_change_rec_grp2; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp2.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_1 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_1 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_1 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp2.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp2; END LOOP; when 'table_change_rec_grp2_2' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp2_2 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp2_2 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp2_2 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp2_2 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp2_2 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp2; LOOP if found then -- raise notice '%', v_table_change_rec_grp2; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp2.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_2 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_2 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_2 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp2.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp2; END LOOP; when 'table_change_rec_grp2_3' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp2_3 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp2_3 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp2_3 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp2_3 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp2_3 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp2; LOOP if found then -- raise notice '%', v_table_change_rec_grp2; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp2.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_3 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_3 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_3 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp2.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp2; END LOOP; when 'table_change_rec_grp2_4' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp2_4 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp2_4 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp2_4 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp2_4 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp2_4 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp2; LOOP if found then -- raise notice '%', v_table_change_rec_grp2; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp2.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_4 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_4 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_4 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp2.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp2; END LOOP; when 'table_change_rec_grp2_5' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp2_5 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp2_5 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp2_5 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp2_5 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp2_5 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp2; LOOP if found then -- raise notice '%', v_table_change_rec_grp2; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp2.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_5 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_5 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_5 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp2.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp2; END LOOP; when 'table_change_rec_grp2_6' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp2_6 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp2_6 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp2_6 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp2_6 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp2_6 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp2; LOOP if found then -- raise notice '%', v_table_change_rec_grp2; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp2.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_6 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_6 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp2.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp2.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp2.table_schema)||'.'||quote_ident(v_table_change_rec_grp2.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp2_6 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp2.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp2; END LOOP; else -- raise notice 'no % queue table deal code in this function.', v_tablename; return; end case; end; $$ language plpgsql strict ; ``` 创建组3消费函数 ``` create or replace function mq.build_sql_grp3(n int) returns setof text as $$ declare m int := 0; v_table_change_rec_grp3 mq.table_change_rec_grp3; v_tablename name; v_crt_time timestamp without time zone; curs1 refcursor; v_sql text := ''; v_cols text := ''; v_vals text := ''; v_upd_set text := ''; v_upd_del_where text :=''; v_x_id int8 ; v_max_crt_time timestamp without time zone; begin if n <=0 then -- raise notice 'n must be > 0.'; return; end if; return next 'BEGIN;'; -- 取一个最小的队列表 select tablename,crt_time into v_tablename,v_crt_time from ( select 'table_change_rec_grp3_0' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp3_0 where consumed=false union all select 'table_change_rec_grp3_1' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp3_1 where consumed=false union all select 'table_change_rec_grp3_2' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp3_2 where consumed=false union all select 'table_change_rec_grp3_3' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp3_3 where consumed=false union all select 'table_change_rec_grp3_4' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp3_4 where consumed=false union all select 'table_change_rec_grp3_5' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp3_5 where consumed=false union all select 'table_change_rec_grp3_6' as tablename,min(crt_time) as crt_time from mq.table_change_rec_grp3_6 where consumed=false ) t order by crt_time limit 1; case v_tablename when 'table_change_rec_grp3_0' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp3_0 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp3_0 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp3_0 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp3_0 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp3_0 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp3; LOOP if found then -- raise notice '%', v_table_change_rec_grp3; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp3.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_0 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_0 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_0 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp3.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp3; END LOOP; when 'table_change_rec_grp3_1' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp3_1 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp3_1 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp3_1 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp3_1 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp3_1 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp3; LOOP if found then -- raise notice '%', v_table_change_rec_grp3; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp3.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_1 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_1 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_1 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp3.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp3; END LOOP; when 'table_change_rec_grp3_2' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp3_2 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp3_2 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp3_2 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp3_2 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp3_2 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp3; LOOP if found then -- raise notice '%', v_table_change_rec_grp3; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp3.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_2 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_2 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_2 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp3.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp3; END LOOP; when 'table_change_rec_grp3_3' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp3_3 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp3_3 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp3_3 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp3_3 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp3_3 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp3; LOOP if found then -- raise notice '%', v_table_change_rec_grp3; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp3.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_3 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_3 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_3 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp3.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp3; END LOOP; when 'table_change_rec_grp3_4' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp3_4 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp3_4 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp3_4 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp3_4 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp3_4 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp3; LOOP if found then -- raise notice '%', v_table_change_rec_grp3; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp3.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_4 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_4 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_4 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp3.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp3; END LOOP; when 'table_change_rec_grp3_5' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp3_5 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp3_5 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp3_5 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp3_5 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp3_5 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp3; LOOP if found then -- raise notice '%', v_table_change_rec_grp3; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp3.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_5 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_5 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_5 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp3.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp3; END LOOP; when 'table_change_rec_grp3_6' then -- 获取提交时间( 每个事务的结束时间获取原理, 通过延迟触发器, 在事务结束时触发行触发器, 通过mq.get_commit_time()函数获取时间, 可以确保事务内所有row的时间戳一致. ) -- 回放顺序, 和事务提交顺序一致. 最小原子单位为事务. -- 单个事务包含多个SQL时, 可以通过command id来区分先后顺序, 或者通过序列来区分先后顺序. -- 多个事务同一时刻提交, 如果时间戳一致, 如果每个事务都包含多ROW, 则可能会混合顺序执行. 批量回放时合并成一个事务回放, 不影响一致性. 单一事务回放时, 随机选取哪个事务先执行. if n=1 then select x_id into v_x_id from mq.table_change_rec_grp3_6 where consumed=false order by crt_time,id limit 1; open curs1 for select * from mq.table_change_rec_grp3_6 where consumed=false and x_id=v_x_id order by crt_time,id for update; else select crt_time into v_crt_time from mq.table_change_rec_grp3_6 where consumed=false order by crt_time,id limit 1 offset n-1; if found then open curs1 for select * from mq.table_change_rec_grp3_6 where consumed=false and crt_time<=v_crt_time order by crt_time,id for update; else -- n超出所剩跟踪记录 open curs1 for select * from mq.table_change_rec_grp3_6 where consumed=false order by crt_time,id for update; end if; end if; fetch curs1 into v_table_change_rec_grp3; LOOP if found then -- raise notice '%', v_table_change_rec_grp3; -- build sql -- case tg insert,update,delete,ddl -- quote_ident 封装schema,tablename,column -- quote_nullable 封装value -- 不带主键的表, 如果有重复行, 使用ctid在源端操作单行, 会导致目标端不一致(避免使用ctid, 或者强制要求主键或非空唯一) case v_table_change_rec_grp3.op when 'INSERT' then -- 组装COLUMNS, VALUES v_cols := '' ; v_vals := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_cols := v_cols || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || ',' ; v_vals := v_vals || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; end loop; v_cols := rtrim(v_cols, ',') ; v_vals := rtrim(v_vals, ',') ; -- 组装SQL v_sql := 'insert into '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||'('||v_cols||')'||' values('||v_vals||');' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_6 set consumed=true where current of curs1; return next v_sql; when 'UPDATE' then -- 组装COLUMNS, VALUES v_upd_set := '' ; v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.new_rec),1) loop v_upd_set := v_upd_set || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.new_rec))[i][2]) || ',' ; if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_set := rtrim(v_upd_set, ',') ; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'update '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' set '||v_upd_set||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_6 set consumed=true where current of curs1; return next v_sql; when 'DELETE' then -- 组装COLUMNS, VALUES v_upd_del_where := '' ; for i in 1..array_length(hstore_to_matrix(v_table_change_rec_grp3.old_rec),1) loop if quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) = 'NULL' then v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || ' is null ' || ' and'; else v_upd_del_where := v_upd_del_where || ' ' || quote_ident((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][1]) || '=' || quote_nullable((hstore_to_matrix(v_table_change_rec_grp3.old_rec))[i][2]) || ' and'; end if; end loop; v_upd_del_where := rtrim(v_upd_del_where, 'd') ; v_upd_del_where := rtrim(v_upd_del_where, 'n') ; v_upd_del_where := rtrim(v_upd_del_where, 'a') ; -- 组装SQL v_sql := 'delete from '||quote_ident(v_table_change_rec_grp3.table_schema)||'.'||quote_ident(v_table_change_rec_grp3.table_name)||' where '|| v_upd_del_where ||';' ; -- raise notice '%', v_sql; update mq.table_change_rec_grp3_6 set consumed=true where current of curs1; return next v_sql; else -- raise notice 'I do not known how to deal this op: %', v_table_change_rec_grp3.op; end case; else close curs1; return next 'END;'; return; end if; fetch curs1 into v_table_change_rec_grp3; END LOOP; else -- raise notice 'no % queue table deal code in this function.', v_tablename; return; end case; end; $$ language plpgsql strict ; ``` 验证消息队列取数据的事务一致性 ``` pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 10 src ``` psql src ``` select mq.build_sql_grp1(1); -- min(xid)=max(xid) 取1个xid BEGIN; update public.grp1_tbl1 set id='131056',info='c0d5e77b1a25e9895579d54abf5a1fe1',crt_time='2016-02-10 22:23:25.327334' where id='131056' and info='65f46b8b12e5cdd35f1d95d51dbe0d96' and crt_time='2016-02-10 22:05:11.067228' ; update public.grp1_tbl2 set id='1487543',info='988bddbd620b6ebae7bb12ff1498be09',tbl1_id='235631',crt_time='2016-02-10 22:23:25.327334' where id='1487543' and info='4dcdeddd73928541f3f3991a5bb92239' and tbl1_id='235631' and crt_time='2 016-02-10 21:36:37.314308' ; update public.grp1_tbl1 set id='131057',info='8a7f1010881b8c6593022d11635219d9',crt_time='2016-02-10 22:23:25.327334' where id='131057' and info='c799ebdd5cc46fa8e889c5e7541d7d00' and crt_time='2016-02-10 22:04:57.90855' ; insert into public.grp1_tbl2(id,info,tbl1_id,crt_time) values('1487544','c519842ff4a0d6e8e7369e23bddef451','131057','2016-02-10 22:23:25.327334'); END; select mq.build_sql_grp1(4); -- min(xid)=max(xid) 取1个xid BEGIN; update public.grp1_tbl1 set id='548793',info='32e70c3483fe65c4cf178e14e7ff4c28',crt_time='2016-02-10 22:23:25.327503' where id='548793' and info='b68e03e7219b05e8f0b721772a665da8' and crt_time='2016-02-10 21:36:40.8851' ; update public.grp1_tbl2 set id='665971',info='e2163b17b141901cd755436393b065f6',tbl1_id='328299',crt_time='2016-02-10 22:23:25.327503' where id='665971' and info='911f8c54164052aa756a2d0ddcff5303' and tbl1_id='328299' and crt_time='201 6-02-10 22:04:09.349097' ; update public.grp1_tbl1 set id='548794',info='92c9e6cf5bff1c0f8bb343045a99aca5',crt_time='2016-02-10 22:23:25.327503' where id='548794' and info='09b25e5bf65703fc8036b9c11fc851c5' and crt_time='2016-02-10 21:36:40.8851' ; update public.grp1_tbl2 set id='665972',info='fd988478cc4a2ef639231cdb8c12bd87',tbl1_id='328300',crt_time='2016-02-10 22:23:25.327503' where id='665972' and info='9c536cd4a631ab1bf819c2ab28351f4a' and tbl1_id='328300' and crt_time='201 6-02-10 22:04:09.349097' ; END; select mq.build_sql_grp1(5); -- min(xid) <> max(xid) 不取最后一个xid BEGIN; update public.grp1_tbl1 set id='89941',info='41da03886622b5617ca640804edec45a',crt_time='2016-02-10 22:23:25.327111' where id='89941' and info='d2f3972c3a254cc740d48ff10521fc34' and crt_time='2016-02-10 22:10:41.274318' ; update public.grp1_tbl2 set id='930464',info='35d7e72cab05a5b77361b98605ce8a1f',tbl1_id='655832',crt_time='2016-02-10 22:23:25.327111' where id='930464' and info='ee54ab83230c477bf504cc9db772adbe' and tbl1_id='655832' and crt_time='201 6-02-10 21:36:21.791644' ; update public.grp1_tbl1 set id='89942',info='70cdce347ab4c7e4fe9e07548e7031ca',crt_time='2016-02-10 22:23:25.327111' where id='89942' and info='403cff6b29b47adaa3177051f2503d86' and crt_time='2016-02-10 22:10:41.274318' ; update public.grp1_tbl2 set id='930465',info='4d5a312fe01ae145c6eeea4f3e5af046',tbl1_id='68321',crt_time='2016-02-10 22:23:25.327111' where id='930465' and info='61d352aa53dbb00b04bc036130daaff2' and tbl1_id='68321' and crt_time='2016- 02-10 21:36:21.791644' ; END; select mq.build_sql_grp1(8); -- min(xid) <> max(xid) 不取最后一个xid BEGIN; insert into public.grp1_tbl1(id,info,crt_time) values('194416','bba5f43fce8b54a956429ac1b7ef7669','2016-02-10 22:23:25.327116'); update public.grp1_tbl2 set id='227485',info='4d2619f67c62c4e3ead797b89691273f',tbl1_id='280404',crt_time='2016-02-10 22:23:25.327116' where id='227485' and info='2e55602bb09999d3cdd807aa7316b88c' and tbl1_id='280404' and crt_time='201 6-02-10 22:10:47.235621' ; insert into public.grp1_tbl1(id,info,crt_time) values('194417','50479922a568fa84a18886965df92248','2016-02-10 22:23:25.327116'); update public.grp1_tbl2 set id='227486',info='11d2d48473c0d0c61a7d377bfd7d8b24',tbl1_id='280405',crt_time='2016-02-10 22:23:25.327116' where id='227486' and info='041971b9a3ad5cacb68d6df4b4ef5b94' and tbl1_id='280405' and crt_time='201 6-02-10 22:10:47.235621' ; END; select mq.build_sql_grp1(9); -- min(xid) <> max(xid) 不取最后一个xid BEGIN; update public.grp1_tbl1 set id='731671',info='da56e7cbf49a1d4dfd635be9413f8976',crt_time='2016-02-10 22:23:25.32763' where id='731671' and info='02b4badf7cc1e49ee08d31c0de7a5ac9' and crt_time='2016-02-10 22:04:43.545489' ; update public.grp1_tbl2 set id='1673381',info='43a208d276a24b3cf274acf81997d8e3',tbl1_id='338174',crt_time='2016-02-10 22:23:25.32763' where id='1673381' and info='b6a1343706c49c760f226e9280c27185' and tbl1_id='338174' and crt_time='20 16-02-10 21:35:58.04798' ; update public.grp1_tbl1 set id='731672',info='d931c737f8032d246b9d7013857fea7b',crt_time='2016-02-10 22:23:25.32763' where id='731672' and info='aaad1256f701bb98c0d2c6ecc9ca5035' and crt_time='2016-02-10 22:10:47.520137' ; update public.grp1_tbl2 set id='1673382',info='55c07d91df8856b837e9791613e23c46',tbl1_id='338175',crt_time='2016-02-10 22:23:25.32763' where id='1673382' and info='905a030545fb9912b95e6a06e9952945' and tbl1_id='338175' and crt_time='20 16-02-10 21:35:58.04798' ; update public.grp1_tbl1 set id='656384',info='4a859cebf311b9efba3f2eb9a6f09d9d',crt_time='2016-02-10 22:23:25.32711' where id='656384' and info='acc22461e1179993e56558bb0b43d5e5' and crt_time='2016-02-10 22:04:26.242072' ; update public.grp1_tbl2 set id='1447281',info='c9ec438cf276c3baf4f440633bc9a63f',tbl1_id='446266',crt_time='2016-02-10 22:23:25.32711' where id='1447281' and info='3050833049ae73ebc6aaa43d7c74e854' and tbl1_id='446266' and crt_time='20 16-02-10 21:24:59.28356' ; update public.grp1_tbl1 set id='656385',info='72662c014906a6f513e1caa605f94fab',crt_time='2016-02-10 22:23:25.32711' where id='656385' and info='dfa471e9a5818cd070df6f2593c07dff' and crt_time='2016-02-10 21:36:20.892822' ; update public.grp1_tbl2 set id='1447282',info='e9bd5c2ba44cbfffad67b91ef761abbc',tbl1_id='446267',crt_time='2016-02-10 22:23:25.32711' where id='1447282' and info='64f3242b7fd3949a07fa74bc342d9bc6' and tbl1_id='446267' and crt_time='20 16-02-10 21:24:59.28356' ; END; select mq.build_sql_grp1(13); -- min(xid) <> max(xid) 不取最后一个xid BEGIN; update public.grp1_tbl1 set id='782912',info='c7909a2201657277731746397d237ef9',crt_time='2016-02-10 22:23:25.327133' where id='782912' and info='d783a9796860b1316411f5b258051858' and crt_time='2016-02-10 21:24:44.640954' ; update public.grp1_tbl1 set id='379581',info='f72224a3ba4cb557ea49994b16a3138b',crt_time='2016-02-10 22:23:25.327133' where id='379581' and info='7a7b1817f4aac88af359ba594ddb5924' and crt_time='2016-02-10 21:36:25.240159' ; update public.grp1_tbl2 set id='647936',info='ffb15cdf80106bb3593adf2ee60ccf30',tbl1_id='77106',crt_time='2016-02-10 22:23:25.327133' where id='647936' and info='8599d26d938651dfcfde78b4f86ffb9d' and tbl1_id='77106' and crt_time='2016- 02-10 21:35:39.811541' ; insert into public.grp1_tbl2(id,info,tbl1_id,crt_time) values('1310498','57c717a8be1aaf2f45eaad8826233e83','782912','2016-02-10 22:23:25.327133'); insert into public.grp1_tbl1(id,info,crt_time) values('379582','d7ed641cee4485a94a6496a18de86c44','2016-02-10 22:23:25.327133'); update public.grp1_tbl1 set id='782913',info='d02f8d22d0f6c24d58ab13df5b3c9fba',crt_time='2016-02-10 22:23:25.327133' where id='782913' and info='5d6844b7d2dc5ec3143c932eae2ad36e' and crt_time='2016-02-10 21:24:44.640954' ; update public.grp1_tbl2 set id='647937',info='2e43cfcd9abf35d7e35ba94130555f61',tbl1_id='77107',crt_time='2016-02-10 22:23:25.327133' where id='647937' and info='60f4e42343aa30120afa3033c1905ed2' and tbl1_id='77107' and crt_time='2016- 02-10 21:35:39.811541' ; update public.grp1_tbl2 set id='1310499',info='496e0a9ecc1a5d49592005eafdd5e68f',tbl1_id='106818',crt_time='2016-02-10 22:23:25.327133' where id='1310499' and info='aca3cb5217031c58db8640ec376e66d6' and tbl1_id='106818' and crt_time='2 016-02-10 22:05:16.69726' ; update public.grp1_tbl1 set id='411867',info='17e6c6b8251f25e19c8cd234bceb1e06',crt_time='2016-02-10 22:23:25.327571' where id='411867' and info='99ae15ee891be0d2561d7068d1502570' and crt_time='2016-02-10 22:04:28.532467' ; update public.grp1_tbl2 set id='1358340',info='6975e14b065d06c78166fd84bd413919',tbl1_id='443296',crt_time='2016-02-10 22:23:25.327571' where id='1358340' and info='a7e98e8f6edc4c98b2a933e9dcfb1ee8' and tbl1_id='443296' and crt_time='2 016-02-10 22:04:30.559512' ; update public.grp1_tbl1 set id='411868',info='4945ec0218c9c85406ab1e14dda51588',crt_time='2016-02-10 22:23:25.327571' where id='411868' and info='aa89bbdd62ee5821976ee3bffd8866ad' and crt_time='2016-02-10 22:10:39.064562' ; update public.grp1_tbl2 set id='1358341',info='4d99962abf04a45a85f6f729ff63cd25',tbl1_id='443297',crt_time='2016-02-10 22:23:25.327571' where id='1358341' and info='6a29504b044349981c1309842cd8cb8a' and tbl1_id='443297' and crt_time='2 016-02-10 22:04:30.559512' ; END; ``` 连接到目标库, 创建复制表的结构. ``` \c dest ``` 组1, 这两个表有外键关联, 在一个事务中操作, 在事务中的所有表的跟踪记录必须插入同一个记录表. ``` create table grp1_tbl1 (id int8 primary key, info text, crt_time timestamp); create table grp1_tbl2 ( id int8 primary key, tbl1_id int8 REFERENCES grp1_tbl1(id) DEFERRABLE INITIALLY DEFERRED, info text, crt_time timestamp ); ``` 组2, 这两个表有外键关联, 在一个事务中操作, 在事务中的所有表的跟踪记录必须插入同一个记录表. ``` create table grp2_tbl1 (id int8 primary key, info text, crt_time timestamp); create table grp2_tbl2 ( id int8 primary key, tbl1_id int8 REFERENCES grp2_tbl1(id) DEFERRABLE INITIALLY DEFERRED, info text, crt_time timestamp ); ``` 组3, 这两个表有外键关联, 在一个事务中操作, 在事务中的所有表的跟踪记录必须插入同一个记录表. ``` create table grp3_tbl1 (id int8 primary key, info text, crt_time timestamp); create table grp3_tbl2 ( id int8 primary key, tbl1_id int8 REFERENCES grp3_tbl1(id) DEFERRABLE INITIALLY DEFERRED, info text, crt_time timestamp ); ``` 例子1, 暴力同步 : 开始压测 ``` pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 100000 src ``` 触发器都创建好之后, 就可以导出数据了. 压测的同时, 将数据dump出来, 恢复到dest ``` pg_dump -F p -a -t grp1_tbl1 -t grp1_tbl2 -t grp2_tbl1 -t grp2_tbl2 -t grp3_tbl1 -t grp3_tbl2 -x src | psql dest -f - COPY 158048 COPY 164730 COPY 158068 COPY 165006 COPY 158147 COPY 164808 ``` 继续压测不要停 开始增量恢复, 首先使用单个事务复制的方式, 跳过重复部分(因为消费函数不管目标是否执行成功, 只要数据被取出即更新consumed=true) ``` while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql_grp1(1)) to stdout;commit;' | psql dest -f - >/dev/null ; done while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql_grp2(1)) to stdout;commit;' | psql dest -f - >/dev/null ; done while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql_grp3(1)) to stdout;commit;' | psql dest -f - >/dev/null ; done ``` 确认跳过重复部分后, 使用批量增量复制 ``` while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql_grp1(1000)) to stdout;commit;' | psql dest -f - >/dev/null ; done while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql_grp2(1000)) to stdout;commit;' | psql dest -f - >/dev/null ; done while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql_grp3(1000)) to stdout;commit;' | psql dest -f - >/dev/null ; done ``` 停止压测, 等待增量同步完成 校验数据 ``` psql src select sum(hashtext(t.*::text)) from grp1_tbl1 t; -163788004315 select sum(hashtext(t.*::text)) from grp1_tbl2 t; 311855736266 select sum(hashtext(t.*::text)) from grp2_tbl1 t; -1605268316207 select sum(hashtext(t.*::text)) from grp2_tbl2 t; -136992258088 select sum(hashtext(t.*::text)) from grp3_tbl1 t; 2375761278075 select sum(hashtext(t.*::text)) from grp3_tbl2 t; -388257824197 psql dest select sum(hashtext(t.*::text)) from grp1_tbl1 t; select sum(hashtext(t.*::text)) from grp1_tbl2 t; select sum(hashtext(t.*::text)) from grp2_tbl1 t; select sum(hashtext(t.*::text)) from grp2_tbl2 t; select sum(hashtext(t.*::text)) from grp3_tbl1 t; select sum(hashtext(t.*::text)) from grp3_tbl2 t; ``` 结果一致 例子2, 可以用于跳过重复的温柔例子, 利用快照功能导出 : (先清除dest端的数据) 开始压测 ``` pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 100000 src ``` 连接到源库, 创建一个快照, 记录当前的事务状态, 不要退出事务 ``` psql src src=# begin transaction isolation level repeatable read ; BEGIN src=# select txid_current_snapshot(); txid_current_snapshot ---------------------------------------------------------------------- 31004443:31004517:31004443,31004446,31004449,31004457,31004466,31004469,31004480,31004487,31004489,31004493,31004495,31004498,31004500,31004501,31004502,31004503,31004505,31004507,31004508,31004509,31004510,31004511,31004512,31004513,31004514,31004515 (1 row) ``` 最小未提交事务:最小未分配事务:未提交事务(s) ``` src=# select pg_export_snapshot(); pg_export_snapshot -------------------- 01DA7E30-1 (1 row) ``` 使用这个快照, 将数据dump出来, 恢复到dest ``` pg_dump --snapshot=01DA7E30-1 -F p -a -t grp1_tbl1 -t grp1_tbl2 -t grp2_tbl1 -t grp2_tbl2 -t grp3_tbl1 -t grp3_tbl2 -x src | psql dest -f - COPY 678854 COPY 865425 COPY 679293 COPY 866652 COPY 678734 COPY 865728 ``` 结束快照事务 ``` src=# end; COMMIT ``` 继续压测不要停 开始增量恢复, 首先务必等待确认pg_dump时的未提交事务已提交 ``` postgres=# select * from txid_snapshot_xip(txid_current_snapshot()) t(xid) where t.xid in (31004443,31004446,31004449,31004457,31004466,31004469,31004480,31004487,31004489,31004493,31004495,31004498,31004500,31004501,31004502,31004503,31004505,31004507,31004508,31004509,31004510,31004511,31004512,31004513,31004514,31004515); xid ----- (0 rows) ``` 清除不需要恢复的跟踪记录 ``` psql src src=# update mq.table_change_rec_grp1 set consumed =true where consumed=false and (x_id<31004443 or (x_id>=31004443 and x_id<31004517 and x_id not in (31004443,31004446,31004449,31004457,31004466,31004469,31004480,31004487,31004489,31004493,31004495,31004498,31004500,31004501,31004502,31004503,31004505,31004507,31004508,31004509,31004510,31004511,31004512,31004513,31004514,31004515))); UPDATE 699488 src=# update mq.table_change_rec_grp2 set consumed =true where consumed=false and (x_id<31004443 or (x_id>=31004443 and x_id<31004517 and x_id not in (31004443,31004446,31004449,31004457,31004466,31004469,31004480,31004487,31004489,31004493,31004495,31004498,31004500,31004501,31004502,31004503,31004505,31004507,31004508,31004509,31004510,31004511,31004512,31004513,31004514,31004515))); UPDATE 699404 src=# update mq.table_change_rec_grp3 set consumed =true where consumed=false and (x_id<31004443 or (x_id>=31004443 and x_id<31004517 and x_id not in (31004443,31004446,31004449,31004457,31004466,31004469,31004480,31004487,31004489,31004493,31004495,31004498,31004500,31004501,31004502,31004503,31004505,31004507,31004508,31004509,31004510,31004511,31004512,31004513,31004514,31004515))); UPDATE 699328 ``` 批量增量复制 ``` while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql_grp1(1000)) to stdout;commit;' | psql dest -f - >/dev/null ; done while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql_grp2(1000)) to stdout;commit;' | psql dest -f - >/dev/null ; done while true; do psql src -q -A -n -t -c 'begin work isolation level repeatable read; copy (select mq.build_sql_grp3(1000)) to stdout;commit;' | psql dest -f - >/dev/null ; done ``` 停止压测, 等待增量同步完成 校验数据 ``` psql src select sum(hashtext(t.*::text)) from grp1_tbl1 t; 566782435274 select sum(hashtext(t.*::text)) from grp1_tbl2 t; 119298584431 select sum(hashtext(t.*::text)) from grp2_tbl1 t; -794442717174 select sum(hashtext(t.*::text)) from grp2_tbl2 t; -390984534106 select sum(hashtext(t.*::text)) from grp3_tbl1 t; 2937942086023 select sum(hashtext(t.*::text)) from grp3_tbl2 t; 302638200204 psql dest select sum(hashtext(t.*::text)) from grp1_tbl1 t; select sum(hashtext(t.*::text)) from grp1_tbl2 t; select sum(hashtext(t.*::text)) from grp2_tbl1 t; select sum(hashtext(t.*::text)) from grp2_tbl2 t; select sum(hashtext(t.*::text)) from grp3_tbl1 t; select sum(hashtext(t.*::text)) from grp3_tbl2 t; ``` 结果一致