PostgreSQL 如何讓心跳永遠不死,支持半同步自動同步、異步升降級 - udf 心跳
作者
digoal
日期
2019-01-30
標籤
PostgreSQL , 同步 , 半同步 , 流複製 , 心跳 , 自動降級 , 自動升級 , dblink , 異步調用
背景
在心跳時,通過自定義UDF,實現心跳永遠不被堵塞,並且支持更加當前的配置自動的進行同步、異步模式的升降級。實現半同步的功能。
UDF輸入
1、優先模式(同步、異步)
2、同步等待超時時間
當優先為同步模式時,假設當前為同步配置,如果備庫異常導致事務提交等待超過指定時間,則自動降級為異步。
當優先為異步模式時,假設當前為同步配置,自動降級為異步。
當優先為同步模式時,假設當前為異步配置,如果備庫恢復到streaming模式,自動升級為同步。
使用技術點:
1、alter system
2、reload conf
3、cancle backend
4、dblink 異步調用
心跳UDF邏輯
判斷當前實例狀態
只讀
退出
讀寫
判斷當前事務模式
異步
發心跳
優先模式是什麼
異步
退出
同步
判斷是否需要升級
升級
退出
同步
消耗異步消息
發遠程心跳
查詢是否超時
降級
否則
消耗異步消息
優先模式是什麼
異步
降級
退出
同步
退出
設計
1、當前postgresql.conf配置
synchronous_commit='remote_write';
synchronous_standby_names='*';
表示同步模式。
2、心跳錶設計
create table t_keepalive(id int primary key, ts timestamp, pos pg_lsn);
3、心跳寫入方法
insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos;
4、創建一個建立連接函數,不報錯
create or replace function conn(
name, -- dblink名字
text -- 連接串,URL
) returns void as $$
declare
begin
perform dblink_connect($1, $2);
return;
exception when others then
return;
end;
$$ language plpgsql strict;
5、更加以上邏輯創建心跳UDF。
create or replace function keepalive (
prio_commit_mode text,
tmout interval
) returns t_keepalive as $$
declare
res1 int;
res2 timestamp;
res3 pg_lsn;
commit_mode text;
conn text := format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database());
conn_altersys text := format('hostaddr=%s port=%s user=%s dbname=%s', '127.0.0.1', current_setting('port'), current_user, current_database());
app_prefix_stat text := 'keepalive_dblink';
begin
if prio_commit_mode not in ('sync','async') then
raise notice 'prio_commit_mode must be [sync|async]';
return null;
end if;
show synchronous_commit into commit_mode;
create extension IF NOT EXISTS dblink;
-- 判斷當前實例狀態
if pg_is_in_recovery()
-- 只讀
then
raise notice 'Current instance in recovery mode.';
return null;
-- 讀寫
else
-- 判斷當前事務模式
if commit_mode in ('local','off')
-- 異步
then
-- 發心跳
insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos into res1,res2,res3;
-- 優先模式是什麼
if prio_commit_mode='async'
-- 異步
then
-- 退出
return row(res1,res2,res3)::t_keepalive;
-- 同步
else
-- 判斷是否需要升級
perform 1 from pg_stat_replication where state='streaming' limit 1;
if found
-- 升級
then
perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=remote_write', true);
perform pg_reload_conf();
-- 退出
return row(res1,res2,res3)::t_keepalive;
end if;
return row(res1,res2,res3)::t_keepalive;
end if;
-- 同步
else
-- 消耗異步消息
perform conn(app_prefix_stat, conn||app_prefix_stat);
perform t from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);
-- 發遠程心跳
perform dblink_send_query(app_prefix_stat, $_$ insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos $_$);
-- 查詢是否超時
<<ablock>>
loop
perform pg_sleep(0.2);
perform 1 from pg_stat_activity where application_name=app_prefix_stat and state='idle' limit 1;
-- 未超時
if found then
select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);
raise notice 'no timeout';
exit ablock;
end if;
perform 1 from pg_stat_activity where wait_event='SyncRep' and application_name=app_prefix_stat and clock_timestamp()-query_start > tmout limit 1;
-- 降級
if found then
perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);
perform pg_reload_conf();
perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';
select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);
raise notice 'timeout';
exit ablock;
end if;
perform pg_sleep(0.2);
end loop;
-- 優先模式是什麼
if prio_commit_mode='async'
-- 異步
then
show synchronous_commit into commit_mode;
-- 降級
if commit_mode in ('on','remote_write','remote_apply')
then
perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);
perform pg_reload_conf();
perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';
end if;
-- 退出
return row(res1,res2,res3)::t_keepalive;
-- 同步
else
-- 退出
return row(res1,res2,res3)::t_keepalive;
end if;
end if;
end if;
end;
$$ language plpgsql strict;
測試
1、當前為同步模式
postgres=# show synchronous_commit ;
synchronous_commit
--------------------
remote_write
(1 row)
2、人為關閉從庫,心跳自動將數據庫改成異步模式,並通知所有等待中會話。
postgres=# select * from keepalive ('sync','5 second');
NOTICE: extension "dblink" already exists, skipping
NOTICE: timeout
id | ts | pos
----+----------------------------+-------------
1 | 2019-01-30 00:48:39.800829 | 23/9501D5F8
(1 row)
postgres=# show synchronous_commit ;
synchronous_commit
--------------------
local
(1 row)
3、恢復從庫,心跳自動將數據庫升級為優先sync模式。
postgres=# select * from keepalive ('sync','5 second');
NOTICE: extension "dblink" already exists, skipping
id | ts | pos
----+----------------------------+-------------
1 | 2019-01-30 00:48:47.329119 | 23/9501D6E8
(1 row)
postgres=# select * from keepalive ('sync','5 second');
NOTICE: extension "dblink" already exists, skipping
NOTICE: no timeout
id | ts | pos
----+----------------------------+-------------
1 | 2019-01-30 00:49:11.991855 | 23/9501E0C8
(1 row)
postgres=# show synchronous_commit ;
synchronous_commit
--------------------
remote_write
(1 row)
小結
在心跳時,通過自定義UDF,實現心跳永遠不被堵塞,並且支持更加當前的配置自動的進行同步、異步模式的升降級。實現半同步的功能。
UDF輸入
1、優先模式(同步、異步)
2、同步等待超時時間
當優先為同步模式時,假設當前為同步配置,如果備庫異常導致事務提交等待超過指定時間,則自動降級為異步。
當優先為異步模式時,假設當前為同步配置,自動降級為異步。
當優先為同步模式時,假設當前為異步配置,如果備庫恢復到streaming模式,自動升級為同步。
使用技術點:
1、alter system
2、reload conf
3、cancle backend
4、dblink 異步調用
使用心跳實現半同步,大大簡化了整個同步、異步模式切換的流程。當然如果內核層面可以實現,配置幾個參數,會更加完美。
參考
dblin 異步
《PostgreSQL 數據庫心跳(SLA(RPO)指標的時間、WAL SIZE維度計算)》
《PostgreSQL 雙節點流複製如何同時保證可用性、可靠性(rpo,rto) - (半同步,自動降級方法實踐)》