雲計算

PostgreSQL 如何讓心跳永遠不死,支持半同步自動同步、異步升降級 – udf 心跳

PostgreSQL 如何讓心跳永遠不死,支持半同步自動同步、異步升降級 - udf 心跳

作者

digoal

日期

2019-01-30

標籤

PostgreSQL , 同步 , 半同步 , 流複製 , 心跳 , 自動降級 , 自動升級 , dblink , 異步調用


背景

在心跳時,通過自定義UDF,實現心跳永遠不被堵塞,並且支持更加當前的配置自動的進行同步、異步模式的升降級。實現半同步的功能。

UDF輸入

1、優先模式(同步、異步)

2、同步等待超時時間

當優先為同步模式時,假設當前為同步配置,如果備庫異常導致事務提交等待超過指定時間,則自動降級為異步。

當優先為異步模式時,假設當前為同步配置,自動降級為異步。

當優先為同步模式時,假設當前為異步配置,如果備庫恢復到streaming模式,自動升級為同步。

使用技術點:

1、alter system

2、reload conf

3、cancle backend

4、dblink 異步調用

心跳UDF邏輯

判斷當前實例狀態  
  
  只讀  
  
    退出  
  
  讀寫  
  
    判斷當前事務模式   
  
      異步  
  
        發心跳  
  
        優先模式是什麼  
  
          異步  
  
            退出  
  
          同步  
  
            判斷是否需要升級  
  
              升級  
  
              退出  
  
  
      同步  
  
        消耗異步消息  
  
        發遠程心跳  
  
        查詢是否超時  
  
          降級  
  
        否則  
  
          消耗異步消息  
  
        優先模式是什麼  
  
        異步  
  
          降級  
  
          退出  
  
        同步  
  
          退出  

設計

1、當前postgresql.conf配置

synchronous_commit='remote_write';  
synchronous_standby_names='*';  

表示同步模式。

2、心跳錶設計

create table t_keepalive(id int primary key, ts timestamp, pos pg_lsn);  

3、心跳寫入方法

insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos;  

4、創建一個建立連接函數,不報錯

create or replace function conn(        
  name,   -- dblink名字        
  text    -- 連接串,URL        
) returns void as $$          
declare          
begin          
  perform dblink_connect($1, $2);         
  return;          
exception when others then          
  return;          
end;          
$$ language plpgsql strict;      

5、更加以上邏輯創建心跳UDF。

create or replace function keepalive (  
  prio_commit_mode text,    
  tmout interval  
) returns t_keepalive as $$  
declare  
  res1 int;  
  res2 timestamp;  
  res3 pg_lsn;  
  commit_mode text;  
  conn text := format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database());  
  conn_altersys text := format('hostaddr=%s port=%s user=%s dbname=%s', '127.0.0.1', current_setting('port'), current_user, current_database());  
  app_prefix_stat text := 'keepalive_dblink';  
begin  
  if prio_commit_mode not in ('sync','async') then  
    raise notice 'prio_commit_mode must be [sync|async]';  
    return null;  
  end if;  
  
  show synchronous_commit into commit_mode;  
  
  create extension IF NOT EXISTS dblink;  
  
  -- 判斷當前實例狀態  
  if pg_is_in_recovery()   
  
  -- 只讀  
  then  
    raise notice 'Current instance in recovery mode.';  
    return null;  
      
  -- 讀寫  
  else  
  
    -- 判斷當前事務模式   
    if commit_mode in ('local','off')  
  
    -- 異步  
    then  
  
      -- 發心跳  
      insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos into res1,res2,res3;  
  
      -- 優先模式是什麼  
      if prio_commit_mode='async'   
  
      -- 異步  
      then  
  
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
  
      -- 同步  
      else  
  
        -- 判斷是否需要升級  
        perform 1 from pg_stat_replication where state='streaming' limit 1;  
        if found  
  
        -- 升級  
        then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=remote_write', true);   
          perform pg_reload_conf();   
  
          -- 退出  
          return row(res1,res2,res3)::t_keepalive;  
        end if;  
  
        return row(res1,res2,res3)::t_keepalive;  
      end if;  
  
  
    -- 同步  
    else  
  
      -- 消耗異步消息  
      perform conn(app_prefix_stat,  conn||app_prefix_stat);     
      perform t from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
  
      -- 發遠程心跳  
      perform dblink_send_query(app_prefix_stat, $_$ insert into t_keepalive values (1,now(),pg_current_wal_lsn()) on conflict (id) do update set ts=excluded.ts,pos=excluded.pos returning id,ts,pos $_$);    
  
      -- 查詢是否超時  
      <<ablock>>  
      loop  
        perform pg_sleep(0.2);  
  
        perform 1 from pg_stat_activity where application_name=app_prefix_stat and state='idle' limit 1;  
        -- 未超時  
        if found then  
          select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
          raise notice 'no timeout';  
          exit ablock;  
        end if;  
            
        perform 1 from pg_stat_activity where wait_event='SyncRep' and application_name=app_prefix_stat and clock_timestamp()-query_start > tmout limit 1;  
        -- 降級  
        if found then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);   
          perform pg_reload_conf();  
          perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';  
          select id,ts,pos into res1,res2,res3 from dblink_get_result(app_prefix_stat, false) as t(id int, ts timestamp, pos pg_lsn);  
          raise notice 'timeout';  
          exit ablock;  
        end if;  
            
        perform pg_sleep(0.2);  
      end loop;  
  
      -- 優先模式是什麼  
      if prio_commit_mode='async'   
  
      -- 異步  
      then  
        show synchronous_commit into commit_mode;  
        -- 降級  
        if commit_mode in ('on','remote_write','remote_apply')   
        then  
          perform dblink_exec(conn_altersys, 'alter system set synchronous_commit=local', true);   
          perform pg_reload_conf();  
          perform pg_cancel_backend(pid) from pg_stat_activity where wait_event='SyncRep';  
        end if;  
              
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
  
      -- 同步  
      else  
  
        -- 退出  
        return row(res1,res2,res3)::t_keepalive;  
      end if;  
          
    end if;  
  
  end if;  
end;  
$$ language plpgsql strict;  

測試

1、當前為同步模式

postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 remote_write  
(1 row)  

2、人為關閉從庫,心跳自動將數據庫改成異步模式,並通知所有等待中會話。

postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
NOTICE:  timeout  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:48:39.800829 | 23/9501D5F8  
(1 row)  
  
postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 local  
(1 row)  

3、恢復從庫,心跳自動將數據庫升級為優先sync模式。

postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:48:47.329119 | 23/9501D6E8  
(1 row)  
  
postgres=# select * from keepalive ('sync','5 second');  
NOTICE:  extension "dblink" already exists, skipping  
NOTICE:  no timeout  
 id |             ts             |     pos       
----+----------------------------+-------------  
  1 | 2019-01-30 00:49:11.991855 | 23/9501E0C8  
(1 row)  
  
postgres=# show synchronous_commit ;  
 synchronous_commit   
--------------------  
 remote_write  
(1 row)  

小結

在心跳時,通過自定義UDF,實現心跳永遠不被堵塞,並且支持更加當前的配置自動的進行同步、異步模式的升降級。實現半同步的功能。

UDF輸入

1、優先模式(同步、異步)

2、同步等待超時時間

當優先為同步模式時,假設當前為同步配置,如果備庫異常導致事務提交等待超過指定時間,則自動降級為異步。

當優先為異步模式時,假設當前為同步配置,自動降級為異步。

當優先為同步模式時,假設當前為異步配置,如果備庫恢復到streaming模式,自動升級為同步。

使用技術點:

1、alter system

2、reload conf

3、cancle backend

4、dblink 異步調用

使用心跳實現半同步,大大簡化了整個同步、異步模式切換的流程。當然如果內核層面可以實現,配置幾個參數,會更加完美。

參考

dblin 異步

《PostgreSQL 數據庫心跳(SLA(RPO)指標的時間、WAL SIZE維度計算)》

《PostgreSQL 雙節點流複製如何同時保證可用性、可靠性(rpo,rto) - (半同步,自動降級方法實踐)》

免費領取阿里雲RDS PostgreSQL實例、ECS虛擬機

大量阿里雲PG解決方案: 任意維度實時圈人; 時序數據實時處理; 時間、空間、業務 多維數據實時透視; 獨立事件相關性分析; 海量關係實時圖式搜索; 社交業務案例; 流式數據實時處理案例; 物聯網; 全文檢索; 模糊、正則查詢案例; 圖像識別; 向量相似檢索; 數據清洗、採樣、脫敏、批處理、合併; GIS 地理信息空間數據應用; 金融業務; 異步消息應用案例; 海量數據 冷熱分離; 倒排索引案例; 海量數據OLAP處理應用;

德哥的 / digoal's PostgreSQL文章入口 - 努力做成PG資源最豐富的個人blog

德哥的微信 / digoal's wechat

Leave a Reply

Your email address will not be published. Required fields are marked *