一句话解释这个功能,使用 postgresql logic decoding 感知数据库的变化,同步到其他数据源。

如果是同步到 postgresql 数据库, 直接使用 logic replication 。

如果是 elasitcsearch 这类异构数据源。就需要解析复制内容。

我这里使用 wal2json 这个插件。

 

1. 怎么使用的。

AWS RDS 的参数 必须要把  rds.logical_replication  改为 1

创建 replication slot

SELECT * FROM pg_create_logical_replication_slot('wal2json_rds', 'wal2json');

 

查看 replication slot

SELECT *  FROM pg_replication_slots;

 

使用  psycopg2 仿照 python-mysql-replication 这个库 开发了 psql 版本 python-psql-replication

具体能用的人,自然可以看出其实没有啥技术含量。当然也可以做的更好。

 

2. 有什么坑。

2.1

没有更新事务的个情况下 wal 一直在涨。 但是 slot 却不推进它的 confirmed_flush_lsn

参考刚刚使用这种方法的时候。我提的一个 stackoverflow 问题: 

解决办法: 我暂时使用评论里面的提到的 aws 的方法, 定期搞点事务出来

(当然是不影响正常数据,处理方式我自己觉得羞愧)

 

2.2

我遇见过即时  alter table test_table replica identity full 之后 ,

wal2json 解析出来的的 event["row"]["after_values"] 依旧没有包含所有的列。

这个线上最好不要 alter table test_table replica identity full 量大产生日志太多。

identity full 有个好处是。入 elasticsearch 的时候。可以不用 update 直接 index 了。

(如果之前出现错误没有index进去)

 

2.3

同步脚本如果已经追不上了。怎么调试。

select pg_current_wal_flush_lsn();

select pg_current_wal_insert_lsn();

select pg_current_wal_lsn();

selet  *  from  pg_replication_slots;

select
      slot_name,
      pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) as replicationSlotLag, 
      active 
from pg_replication_slots ;

 

观察各类 lsn 的值。

在不关心已有 wal 的数据情况, 和不想删除 slot 的情况下

最好是调试  psycopy2 中  start_replication 中的参数,看能否推进。

最粗暴的方式就是 直接删掉 slot 重来。


 

本文只是记录自己怎么干这件事的。

在没有深入理解 postgresql 的机制的情况下(羞愧),

这个方法暂时没有带来太多的处理负担。

需要鞭策自己真的深入理解。否则出了问题真的是抓瞎。

 


不得不反问自己。

在没有 在没有深入理解 postgresql 的情况下。怎么敢这个干?

答 : 因为在 mysql 那边也干过。

那是深入理解 mysql 了吗?

答 : 其实也有没有。(羞愧)


上面的思路形成:https://github.com/jiamo/pg_to_es

但 https://github.com/toluaina/pgsync/tree/master/pgsync  做的更好

 

jiamo post at 2021-02-23