はじめに
この記事は CADDi Tech/Product Advent Calendar 2025 15 日目の記事です。
なんかいろいろやっている伊藤 (@amaya382) です。今回は非同期なデータ永続化処理で使われる Inbox/Outbox Pattern と呼ばれるデザインパターンの実装小ネタです。特に非同期メッセージの送受信を具体例として紹介していきます。
Inbox/Outbox Pattern はメッセージングシステムを利用する際の冪等性や Atomicity 担保に有用ですが、運用負荷やパフォーマンスリスクが発生しやすいものとなっています。この記事では Inbox/Outbox Pattern の有用性はそのままに、それらを軽減する方法を紹介します。Inbox/Outbox Pattern 導入の参考にどうぞ。
なお具体の方法は概ね AlloyDB for PostgreSQL を前提とします。PostgreSQL や類似の互換製品でも大抵再現できると思いますが試さないとわかりません。
Inbox/Outbox Pattern
本題に入る前に、簡単に Inbox/Outbox Pattern についておさらいしておきます。
Inbox Pattern
Inbox Pattern はメッセージの受信時に冪等性を担保するために用いられる手法です。多くのシーンでメッセージングシステムではパフォーマンスなどの観点から At Least Once、つまり同じメッセージが複数回送られることを許容する戦略が用いられます。この場合、同じメッセージが複数回来たときに受信側で冪等になるような処理が求められます。Inbox Pattern は受信したメッセージをデータベース (Inbox テーブル) に保持し、既に処理済みとなっていたら処理をスキップすることで冪等性を担保するアプローチです。この重複チェックのため、再送が想定される期間はメッセージを Inbox テーブルに保持し続ける必要があります。
sequenceDiagram
autonumber
participant Broker as Message Broker
participant App as Consumer App
participant DB as Database
Broker->>App: メッセージ受信 (MsgID: 123)
Note over App, DB: 冪等性担保のための重複排除
App->>DB: Inboxテーブルに MsgID: 123 があるか確認
alt 既に処理済み (Inboxに存在する)
App-->>Broker: ACK (何もしない)
Note right of App: 重複メッセージのためスキップ
else 新規メッセージ
rect rgb(240, 248, 255)
Note right of App: DBトランザクション開始
App->>DB: ビジネスロジック実行
App->>DB: Inboxテーブルに MsgID: 123 を保存
App-->>DB: COMMIT (確定)
end
App-->>Broker: ACK (処理完了通知)
end
Note over DB: 定期バッチ処理
DB->>DB: 古いInboxレコードを物理削除
Outbox Pattern
Outbox Pattern はデータベースへの更新とメッセージ送信を Atomic に行うために用いられる手法です。データ更新と連動するメッセージ送信を単純に行うと、いずれか一方だけが失敗し不整合となる可能性があります。Outbox Pattern ではこの不整合を回避するため、1 トランザクションでデータベースの更新と送信予定のメッセージの保存 (Outbox テーブル) を行い、その後非同期に Outbox テーブルからメッセージの送信を行います。送信が成功したメッセージは Outbox テーブルから削除できますが、トラブルシューティングやリプレイ (メッセージ送信の再現) のためにしばらく保持する運用が多く取られます。
sequenceDiagram
autonumber
participant Client
participant App as Application
participant DB as Database
participant Relay as Message Relay<br/>(Poller/Publisher)
participant Broker as Message Broker
Note over Client, DB: 1. データの整合性を担保するフェーズ
Client->>App: リクエスト (例: 注文確定)
rect rgb(240, 248, 255)
Note right of App: DBトランザクション開始
App->>DB: ビジネスデータの更新 (Ordersテーブル)
App->>DB: メッセージの保存 (Outboxテーブル)
App-->>DB: COMMIT (確定)
end
App-->>Client: レスポンス (成功)
Note over DB, Broker: 2. メッセージを送信するフェーズ (非同期)
loop 定期ポーリング
Relay->>DB: 未送信メッセージを取得
Relay->>Broker: メッセージをPublish (送信)
Broker-->>Relay: ACK (受信確認)
rect rgb(240, 248, 255)
Note right of Relay: ここで削除またはフラグ更新
Relay->>DB: Outboxレコードを削除 (or 処理済みに更新)
end
end
Note over DB: 定期バッチ処理 (フラグを使う場合)
DB->>DB: 古いOutboxレコードを物理削除
Inbox/Outbox Pattern の厄介なところ
Inbox Pattern、Outbox Pattern はメッセージングシステムを効果的に使うために便利ですが、少し厄介な運用がつきまといます。その仕組み上、データベース上に多数のメッセージをしばらく保持しなければなりません。そしてメッセージは増え続けるため、定期的に削除しないとストレージが肥大化しコストやパフォーマンス悪化を引き起こしてしまいます。またメッセージの件数が多い場合、単純に DELETE 文で削除するとデータベースの負荷が高くなりやす点にも注意しなければなりません。
パーティショニングを使った Inbox/Outbox Pattern の運用
今回は Inbox/Outbox Pattern で溜まるメッセージを自動化されたパーティションテーブルで管理することで、運用負荷とパフォーマンスリスクの軽減を同時に図ります。
概要
やることはとてもシンプルで、pg_partman で Inbox/Outbox テーブルを時系列パーティショニングし、pg_cron で時系列パーティションの自動作成・削除を設定するだけです。例えば Inbox/Outbox テーブルをインターバルを 1 日としてパーティショニングし、1 週間経った古いパーティションを削除するといった具合です。これらの処理はすべて自動化できるため運用負荷が低く、パーティション単位の削除となるためレコード単位の削除と比較して非常に軽量にできます。
使うツール
AlloyDB for PostgreSQL を前提としています。AlloyDB でも利用できる PostgreSQL Extension を利用します。
pg_partman
パーティションの作成・削除などのメンテナンスを簡単にしてくれる PostgreSQL Extension です。インターバルや保持期間などを設定しておくと簡単にパーティションを作成・削除することができるようになります。Inbox/Outbox テーブルのパーティション管理を簡単にするために利用します。
注意: pg_partman_bgw と呼ばれるパーティションメンテナンスをバックグラウンドで自動実行してくれるモジュールも提供されていますが、残念ながら AlloyDB for PostgreSQL では利用できません。
GitHub - pgpartman/pg_partman: Partition management extension for PostgreSQL
pg_cron
データベースのバックグラウンドで動作する Cron-based なジョブスケジューラーの PostgreSQL Extension です。任意のクエリを Cron 方式でスケジュール実行できます。AlloyDB for PostgreSQL で pg_partman_bgw が利用できないため、その代替として利用します。
GitHub - citusdata/pg_cron: Run periodic jobs in PostgreSQL
手順
ここでは大まかなセットアップ手順を紹介していきます。
Extension のセットアップ
pg_cron のインストール
database_flags に以下の設定を追加することで AlloyDB に pg_cron をインストールできます:
alloydb.enable_pg_cron='on'
pg_partman のインストール
AlloyDB の場合、pg_partman はデフォルトでインストールされているので何もしなくて良いです。
Extensions の有効化
あらかじめパーティションメンテナンスに利用するユーザーを準備してください。実行ユーザーに注意しつつ、以下で有効化できます。
-- 実行ユーザー: 特権ユーザー -- pg_cron の有効化 CREATE EXTENSION pg_cron; GRANT USAGE ON SCHEMA cron TO <メンテナンス実行ユーザー>;
-- 実行ユーザー: メンテナンス実行ユーザーを推奨 (以降これを前提としている) -- pg_partman の有効化 CREATE SCHEMA partman; -- partman 用スキーマの作成 CREATE EXTENSION pg_partman SCHEMA partman;
パーティションが置かれるスキーマのセットアップ
パーティションテーブルを置くスキーマをセットアップしていきます。
-- 実行ユーザー: 特権ユーザーを推奨 -- 未作成なら作る CREATE SCHEMA <パーティションテーブル用スキーマ>; -- 実行ユーザー: 特権ユーザーを推奨 -- メンテナンス実行時に必要な権限を付与 GRANT USAGE, CREATE ON SCHEMA <パーティションテーブル用スキーマ> TO <メンテナンス実行ユーザー>; -- 実行ユーザー: メンテナンス実行ユーザー -- 今後作成されるテーブルにアクセスする権限をアプリケーション用ユーザーに自動付与 ALTER DEFAULT PRIVILEGES IN SCHEMA <パーティションテーブル用スキーマ> GRANT SELECT, INSERT, DELETE, UPDATE ON TABLES TO <アプリケーション用ユーザー>;
パーティションテーブルのセットアップ
実際にパーティションを利用する Inbox/Outbox テーブルをセットアップしていきます。
-- 実行ユーザー: メンテナンス実行ユーザー -- テーブル定義は必要な形で定義すること。 -- パーティションキーを primary key に含める必要がある。 CREATE TABLE <パーティションテーブル用スキーマ>.<パーティションテーブル> ( "message_id" UUID NOT NULL, <パーティションキー> TIMESTAMPTZ(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, CONSTRAINT <primary_key> PRIMARY KEY ("message_id", <パーティションキー>) ) PARTITION BY RANGE (<パーティションキー>);
自動パーティションメンテナンスのセットアップ
cron.schedule は論理データベースごとに、partman.create_parent はパーティションテーブルごとに設定していきます。
-- (自動マイグレーションで扱いやすいように冪等な実装にしている) -- 実行ユーザー: メンテナンス実行ユーザー (これを実行したユーザーでメンテナンスが実行されるため) -- Schedule cron job only when it doesn't already exist DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM cron.job WHERE jobname = 'partman-maintenance-job' -- 任意の名前 ) THEN PERFORM cron.schedule( 'partman-maintenance-job', '@hourly', -- 失敗しても何度か実行されるように、パーティション単位より短いことが推奨されている 'CALL partman.run_maintenance_proc()' -- partman の実行コマンドがこれ ); END IF; END $$; -- 実行ユーザー: メンテナンス実行ユーザー -- Create partman parent only when it doesn't already exist DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM partman.part_config WHERE parent_table = <パーティションテーブル用スキーマ>.<パーティションテーブル> ) THEN PERFORM partman.create_parent( p_parent_table => <パーティションテーブル用スキーマ>.<パーティションテーブル>, p_control => <パーティションキー>, p_interval => <パーティション単位, e.g, '1 day'> ); END IF; END $$; UPDATE partman.part_config SET infinite_time_partitions = true, retention = <パーティションの保持期間, e.g., '7 days'>, retention_keep_table = false -- 保持期間を過ぎたパーティションを削除する場合は false WHERE parent_table = <パーティションテーブル用スキーマ>.<パーティションテーブル>;
監視
運用するとなると監視が気になるところですよね。
pg_cron/pg_partman のログは AlloyDB のログとしてみることができるため、そこにアラートなどを仕込むこともできます。またあまり運用には適していませんが、pg_cron で利用される cron.job_run_details テーブルを見ることでジョブの成否やエラーの内容などを確認することもできます。
補足: さらなるログ確認のためには pg_jobmon などの Extension が必要となりますが、残念ながら AlloyDB ではサポートされていません。
補足: PostgreSQL を使う場合
AlloyDB for PostgreSQL ではなく PostgreSQL を利用する場合はいくつか手順が変わります。概要だけ紹介しておきます。
- pg_cron
- pg_partman_bgw が使えるため不要になる
- pg_partman
- apt などを利用してホストに pg_partman をインストール
- postgresql.conf に pg_partman_bgw の設定を追加
shared_preload_libraries = 'pg_partman_bgw'pg_partman_bgw.intervalなど
おわりに
AlloyDB for PostgreSQL だと少し制約もありましたが、運用負荷もパフォーマンス懸念も小さくてはっぴー!
今年もいろんなポジションで人を探しているようです。私のいるチームもあるみたいですね。気になる人は Twitter の DM でもなんでも気軽にどうぞ!(お久しぶりでもはじめましてでも)