電気ひつじ牧場

技術メモ

自作RDBにRaftを実装して分散データベースにしてみた

はじめに

前回・前々回の記事ではDatabase Design and Implementation: Second EditionをもとにGoで自作RDB simpledb-go を実装し、B-treeインデックスによる読み取り性能の向上を確認しました。

github.com

今回はこのDBにRaftコンセンサスアルゴリズムを実装し、複数ノードで動く分散データベースに拡張してみます。

これまで作ってきたシングルノードのRDBでもデータの一貫性はありますが、そのノードが死んだら終わりです。単純にノードを増やすだけでは、ノード間でデータがずれて一貫性が崩れてしまいます。Raftを使った分散DBでは、過半数のノードによる合意を通じて全ノードのログを同じ順序に保つことで、一貫性を維持したままノード障害への耐性を得られます。

Raft実装はetcdやHashiCorpによる既存ライブラリもありますが、内部を理解するためAIの力を大いに借りつつsimpledb-goの中に実装しました。以降長々と解説が続きますが、動いている様子が見たい方は実際に動かしてみるまで飛んでください。

Raftについて

Raftは分散合意アルゴリズムの一種で、複数のノードがログの適用順序について合意し、全ノードの状態を一致させる仕組みです(論文)。

分散システムでは、書き込みを受け付ける窓口が複数あると、同時に異なる書き込みが入って状態が食い違うリスクがあります。Raftではこの問題を、書き込みを受け付けるノードを1台(リーダー)に限定し、他のノード(フォロワー)はリーダーから複製されたログを受け取って再生するだけ、という構造で解決しています。しかしリーダーが1台だと、そのノードが落ちたときにシステムが止まってしまいます。そこでRaftには、リーダーが死んだときに残ったノードの中から新しいリーダーを選び直すリーダー選挙の仕組みがあります。これらのログ複製とリーダー選出を、データの不整合が起こらないように安全に実行するのがRaftアルゴリズムの肝となります。

3つの状態

各ノードはリーダー、フォロワー、候補者の3つの状態を行き来します。

リーダー

クラスタ全体の司令塔です。クライアントからの書き込みを受け付け、自分のログに追記したうえで全フォロワーにそのログを複製します。自身を含めて過半数のノードがそのログを持てば、そのエントリはコミット済みとみなされ実データに適用されます。全ノードの応答を待たず過半数で十分とすることで、一部のノードが落ちていてもシステムを停止させずに済みます。また、定期的にハートビートという軽量なメッセージを送って自分の生存をフォロワーに伝えます。

フォロワー

リーダーからのログやハートビートを受け取り、指示通りにログを適用します。一定時間リーダーからの通信が途絶えると、リーダーに障害が起きた可能性があると判断して候補者に昇格します。

候補者

リーダー選挙に立候補した状態です。自分が持つtermという任期番号を表す値をインクリメントし、他のノードに投票を呼びかけます。過半数の票を獲得できればリーダーに昇格し、獲得できなければフォロワーに戻って次の選挙を待ちます。

障害が起こると

障害のパターンに応じて振る舞いも変わります。代表的なものを紹介します。

リーダー障害

リーダーが落ちると、フォロワーはハートビートが途絶えたことで障害を検知します。するといずれかのフォロワーが候補者に昇格して選挙を開始し、自己投票も含めて過半数の票を得るとそのノードが新しいリーダーになります。このとき、投票には「候補者のログが自分のログと同等以上に新しいこと」という条件があり、ログが古いノードは票を集められません。コミット済みのエントリは過半数のノードに複製されているため、ログが古い候補者は過半数の票を得られず、新リーダーは必ずコミット済みエントリをすべて持つノードから選ばれます。つまり、データが失われることはありません。

フォロワー障害

フォロワーが落ちた場合、リーダーは残りのノードで過半数を確保できる限り書き込みを継続できます。3台構成なら1台のフォロワー障害に耐えられます。障害から復帰したフォロワーは、リーダーからのメッセージで自分のログが遅れていることが検出され、不足分が自動的に送られてきて追いつきます。

ネットワークパーティション

ノード自体は生きているが、ネットワークの分断によりクラスタが2つのグループに分かれてしまうケースです。たとえば3台構成でリーダーとフォロワー2台に分断された場合、リーダー側は過半数を確保できないため書き込みが停止します。一方フォロワー側は過半数を持つため新リーダーを選出して書き込みを継続できます。分断が解消されると、旧リーダーは新リーダーのより大きなterm(後述)を受け取ってフォロワーに降格し、クラスタは再び一つにまとまります。このように、Raftではどちらのパーティションにも過半数がなければ書き込みが止まり、過半数がある側だけが継続できるため、データの不整合は起きません。

term

ここで重要なのがtermという概念です。termは論理的な時計のようなもので、選挙のたびにインクリメントされます。どのリーダーの時代に書かれたエントリかを識別するのに使われるほか、古いリーダーの排除にも役立ちます。たとえばネットワーク分断により一時的に到達不能になっていた旧リーダーが復帰したとき、自分より大きなtermを持つノードからメッセージを受け取ると、自分は古い時代のリーダーだと認識してフォロワーに降格します。これにより、クラスタにリーダーが2台同時にコミットをできる状態(split brain)を防いでいます。

simpledb-goでの設計方針

今回はWALをトランザクション単位でまとめてRaftログとして転送する方式を採用しました。

"The Log is the Database" という言葉がある通り、データベースの状態はログを最初から順に再生すれば完全に再構築できます。つまりログさえ送ればフォロワーは同じ状態を再現できるわけです。具体的には、トランザクションのコミット時にそのトランザクションに含まれるすべてのWALレコード(SetIntやSetStringといった変更操作)をまとめて1つのRaftログエントリとして投入します。こうすることで、フォロワーへの複製がトランザクション単位でアトミックになります。1つのRaftログは全体がコミットされるかされないかの二択なので、トランザクションの変更が中途半端に複製されることはありません。

Raftログエントリの構造

各Raftログエントリは以下の3つの情報を持ちます。

  • Index: ログ内の通し番号
  • Term: そのエントリが書かれた時のリーダーのTerm
  • Data: 実際のデータ。simpledb-goではトランザクション内のWALレコード群(データファイル中のブロック番号、オフセット、書き込み前/後の値)をシリアライズしたもの

IndexとTermはログの一貫性チェックやコミット判定に使われます。これらの役割は実装セクションで詳しく見ていきます。

ローカルWALとの関係

Raftのログはコミットされた変更の履歴を正しい順序で持ちます。では、Raftログを全ノードが持つようになればローカルのWALは不要かというと、そうではありません。

まず、Raftログエントリはコミット時に初めて生成されるため、コミット前のトランザクションについてはそもそもRaftログが作られません。一方でバッファプールが一杯になると、バッファマネージャはコミット前であってもダーティページをディスクに追い出すことがあります。この状態でクラッシュやRaft合意の失敗が起きると、コミットされていない中途半端な変更がディスクに残ってしまいます。これを元に戻すには変更前の値を記録したローカルWALが必要であり、Raftログでは代替できません。

加えて、ダーティページをディスクに書く前に対応するWALレコードを先にディスクへ書いておかないと、ページだけ書かれてWALが失われるケースでリカバリが不可能になります。

こうした理由から、Raftを導入してもリーダーではローカルWALを廃止せず残しています。

一方フォロワーはトランザクションを自分で実行しないため、コミット前の巻き戻すべきデータがそもそも存在しません。フォロワーのリカバリはRaftログからの再適用で完結します。ではフォロワーが適用中にクラッシュし、1つのRaftログに含まれるWALレコードが中途半端に適用された状態になったらどうなるでしょうか。その場合でも各WALレコードの操作は「ブロックZのオフセットYに値Xを書く」という冪等な操作なので、再起動後にリトライのため送られてきた同じRaftログを最初から再適用しても結果は同じになります。

すなわち、リーダーはローカルWALによる巻き戻しで、フォロワーはRaftログからの冪等な再適用で、それぞれトランザクションの原子性が保証されることになります。こういった理由から、リーダーではローカルWALを扱いフォロワーでは扱わないという設計にしています。

実装

実装は dbraft パッケージにまとめています。常駐するgoroutineは2つだけで、1つはフォロワー、候補者、リーダーの状態マシンを駆動するメインループ、もう1つは他ノードからのRPCを受信するループです。

メインループは非常にシンプルです。

func (n *RaftNode) run() {
    for {
        select {
        case <-n.stopCh:
            return
        default:
        }
        switch n.role {
        case Follower:
            n.runFollower()
        case Candidate:
            n.runCandidate()
        case Leader:
            n.runLeader()
        }
    }
}

ノードの現在の役割に応じて対応する関数を呼び出すだけです。各関数が返ると再び switch に戻り、役割が変わっていればそちらの関数が呼ばれます。

リーダー選出

まず正常ケースから見ていきます。3台のノード(Node1, Node2, Node3)があるとして、Node1がリーダー選出に成功するまでの流れは以下の通りです。

sequenceDiagram
    participant N1 as Node1 (Candidate)
    participant N2 as Node2 (Follower)
    participant N3 as Node3 (Follower)

    Note over N1: 選挙タイムアウト発火<br/>term=1, Candidateに昇格<br/>自分に投票(1票目)
    N1->>N2: RequestVote(term=1, lastLogIndex, lastLogTerm)
    N1->>N3: RequestVote(term=1, lastLogIndex, lastLogTerm)
    Note over N2: 候補者のログは自分と同等以上に新しい<br/>→投票OK
    N2-->>N1: VoteGranted=true
    Note over N3: 候補者のログは自分と同等以上に新しい<br/>→投票OK
    N3-->>N1: VoteGranted=true
    Note over N1: 過半数(3/3)獲得<br/>Leaderに昇格
    N1->>N2: AppendEntries(heartbeat)
    N1->>N3: AppendEntries(heartbeat)

全ノードはフォロワーとして起動します。フォロワーはリーダーからのハートビートを待ちますが、一定時間届かなければリーダー不在と判断します。この待ち時間が選挙タイムアウトです。各ノードの選挙タイムアウトにはランダムな揺らぎがあるため、最初にタイムアウトしたノードが候補者に昇格して選挙を開始します。実装は以下の通りです。

func (n *RaftNode) runFollower() {
    for {
        timeout := n.randomElectionTimeout()
        select {
        case <-n.stopCh:
            return
        case <-n.resetElectionCh:
        case <-time.After(timeout):
            n.mu.Lock()
            if n.role == Follower {
                n.role = Candidate
            }
            n.mu.Unlock()
            return
        }
    }
}

randomElectionTimeout() は1000ms〜2000msのランダムな値を返します。この間にリーダーからのハートビートが来なければ(resetElectionCh に何も届かなければ)、候補者に昇格します。タイムアウトにランダム性を持たせているのは、複数ノードが同時に候補者になって票が割れるのを防ぐためです。

候補者に昇格したノードはtermをインクリメントし、自分に投票してから他の全ノードにRequestVoteを送ります。

func (n *RaftNode) runCandidate() {
    n.mu.Lock()
    n.currentTerm++
    n.votedFor = n.id
    currentTerm := n.currentTerm
    lastLogIndex := n.log.LastIndex()
    lastLogTerm := n.log.LastTerm()
    n.persistHardStateLocked()
    n.mu.Unlock()

    votes := 1
    voteCh := make(chan bool, len(n.peers))
    for _, peer := range n.peers {
        go func(peer string) {
            // 投票の呼びかけ
            resp, err := n.transport.RequestVote(peer, &RequestVoteRequest{
                Term:         currentTerm,
                CandidateID:  n.id,
                LastLogIndex: lastLogIndex,
                LastLogTerm:  lastLogTerm,
            })
            // ...
            voteCh <- resp.VoteGranted
        }(peer)
    }

    total := len(n.peers) + 1
    needed := total/2 + 1
    responded := 0
    electionTimer := time.After(n.randomElectionTimeout())
    for {
        select {
        case granted := <-voteCh:
            responded++
            if granted {
                votes++
            }
            if votes >= needed {
                n.mu.Lock()
                if n.currentTerm == currentTerm && n.role == Candidate {
                    n.becomeLeaderLocked()
                }
                n.mu.Unlock()
                return
            }
            if responded == len(n.peers) {
                // 過半数に届かず落選
                n.mu.Lock()
                n.role = Follower
                n.mu.Unlock()
                return
            }
        case <-electionTimer:
            return
        }
    }
}

persistHardStateLocked() はcurrentTermとvotedForをディスクに永続化することで、クラッシュ後に再起動しても同じtermで二重投票してしまうのを防いでいます。

投票の応答はchannelで集約し、過半数(total/2 + 1)に達したらリーダーに昇格します。昇格時には n.currentTerm == currentTerm && n.role == Candidate を再確認しています。これは投票の応答を待っている間に、より大きなtermを持つノードからのメッセージでフォロワーに降格している可能性があるためです。全ピアから応答が返っても過半数に届かなければフォロワーに戻ります。選挙タイムアウトが先に発火した場合も関数を抜け、メインループで再び候補者として選挙をやり直します。

投票リクエストの処理

投票を受けるフォロワー側の処理も見ておきます。

// 投票リクエストに答える
func (n *RaftNode) HandleRequestVote(req *RequestVoteRequest, resp *RequestVoteResponse) error {
    resp.Term = n.currentTerm
    resp.VoteGranted = false

    if req.Term < n.currentTerm {
        // 自分より古いタームの候補者には投票しない
        return nil
    }

    if req.Term > n.currentTerm {
        // 自分より新しいタームの候補者がいればfollowerになる(votedForもリセットされる)
        n.stepDownLocked(req.Term)
    }

    if n.votedFor == "" || n.votedFor == req.CandidateID {
        lastLogTerm := n.log.LastTerm()
        lastLogIndex := n.log.LastIndex()
        logOk := req.LastLogTerm > lastLogTerm ||
            (req.LastLogTerm == lastLogTerm && req.LastLogIndex >= lastLogIndex)
        if logOk {
            n.votedFor = req.CandidateID
            n.persistHardStateLocked()
            resp.VoteGranted = true
            n.resetElection()
        }
    }

    resp.Term = n.currentTerm
    return nil
}

まず、自分より古いtermの候補者には即座に拒否を返します。自分より新しいtermであればフォロワーに降格してvotedForをリセットします。

そのうえで、そのtermでまだ誰にも投票していない(または同じ候補者に投票済みである)ことを確認し、候補者の持つログが自分のログと同等以上に新しいかをチェックします。比較ルールは、最後のログエントリのTermが大きい方が新しく、Termが同じならIndexが大きい方が新しい、というものです。条件を満たせば投票し、votedForを永続化して同じtermで二重投票しないようにします。同時に選挙タイマーをリセットして、投票したばかりなのに自分が候補者に昇格してしまうのを防ぎます。

投票時にこれらの条件があることで、コミット済みのエントリを持たない候補者がリーダーになるのを防いでいます。古いログしか持っていないノードがリーダーになると、コミット済みのデータが失われてしまうからです。

投票で失敗するケース

次に、投票がうまくいかないケースです。たとえばネットワーク分断により一時的に孤立していたノードが復帰した直後を考えます。孤立中にtermだけが進んでいても、そのノードが持っているログは古いままなので、他のノードから投票を拒否されます。

sequenceDiagram
    participant N1 as Node1 (Candidate)
    participant N2 as Node2 (Follower)
    participant N3 as Node3 (Follower)

    Note over N1: 選挙タイムアウト発火<br/>term=5, ログが古い
    N1->>N2: RequestVote(term=5, lastLogTerm=2, lastLogIndex=3)
    N1->>N3: RequestVote(term=5, lastLogTerm=2, lastLogIndex=3)
    Note over N2: 自分のlastLogTerm=4 > 2<br/>候補者のログが古い
    N2-->>N1: VoteGranted=false
    Note over N3: 自分のlastLogTerm=4 > 2<br/>候補者のログが古い
    N3-->>N1: VoteGranted=false
    Note over N1: 過半数の票を得られず<br/>Followerに戻る

Node1は過半数の票を得られずフォロワーに戻ります。その後、ログが最新のノード(例えばNode2)が候補者になって選挙に勝ち、新リーダーからログの複製を受け取ることでNode1も最新の状態に追いつきます。

なお、選挙中はリーダーが不在のため、クライアントからの書き込みは受け付けられずエラーになります。

データ複製のシーケンス

リーダーが決まったら、クライアントからの書き込みを処理します。正常ケースの流れは以下の通りです。

sequenceDiagram
    participant C as Client
    participant L as Leader
    participant F1 as Follower1
    participant F2 as Follower2

    Note over L: 初期状態<br/>nextIndex[F1]=1, nextIndex[F2]=1<br/>matchIndex[F1]=0, matchIndex[F2]=0

    C->>L: INSERT INTO t VALUES(...)
    Note over L: Transaction実行<br/>バッファ上で変更<br/>WALレコードをリストに蓄積
    Note over L: Commit開始<br/>RaftLog.Append(entry) → index=1

    L->>F1: entries=[index=1], leaderCommit=0
    L->>F2: entries=[index=1], leaderCommit=0
    F1-->>L: Success=true
    Note over L: matchIndex[F1]=1
    F2-->>L: Success=true
    Note over L: matchIndex[F2]=1
    Note over L: 過半数(3/3)が index=1 を保持<br/>commitIndex=1 に更新<br/>データに適用
    L-->>C: OK

    Note over L,F2: --- 100msごとのtick ---

    L->>F1: entries=[], leaderCommit=1
    L->>F2: entries=[], leaderCommit=1
    Note over F1: leaderCommit=1を受け取り<br/>commitIndex=1に更新<br/>データに適用
    Note over F2: leaderCommit=1を受け取り<br/>commitIndex=1に更新<br/>データに適用

リーダーはまずトランザクションをローカルで実行します。SetInt/SetStringなどが呼ばれるたびにバッファ上で値を変更しつつ、リストにWALレコードを蓄積します。コミット時にはこれらをまとめてCommandとしてシリアライズし、RaftNode.Applyに渡します。

func (t *Transaction) Commit() error {
    defer t.concurrencyManager.Release()
    if t.raftNode != nil && len(t.recoveryManager.PendingRecords()) > 0 {
        cmd := &dbraft.Command{
            TxNum:   t.state.txNum,
            Records: t.recoveryManager.PendingRecords(),
        }
        data, err := dbraft.MarshalCommand(cmd)
        // ...
        if err := t.raftNode.Apply(data); err != nil {
            return fmt.Errorf("raft apply for transaction %d: %w", t.state.txNum, err)
        }
    }
    // ローカルWALにCOMMITレコード書き込み
    if err := t.recoveryManager.Commit(); err != nil {
        // ...
    }
    // ...
}

その後リーダーはログに追記し、全フォロワーへ並列送信します。このとき、リーダーは以下の3つのインデックスを管理しています。

インデックス 管理単位 説明
nextIndex フォロワーごと 次にそのフォロワーへ送るべきログインデックスの楽観的な推測値。リーダー昇格時に lastIndex + 1 で初期化
matchIndex フォロワーごと そのフォロワーが実際に持っていると確認できた最後のログインデックス
commitIndex 1つだけ 過半数のノードへの複製が確認できたログの最後のインデックス。ここまでのエントリが確定済みという扱いになる。ハートビートに含めてフォロワーに伝える

リーダーはフォロワーにログを送る際、nextIndexからログ末尾までのエントリを送信します。フォロワーから成功の応答が返ればmatchIndexを更新し、matchIndexをもとにcommitIndexを計算します。失敗した場合はnextIndexを引き下げ、次回の送信で再送します。このとき、素朴にnextIndexを1ずつ減らすとフォロワーが大きく遅れている場合に何往復もかかってしまうため、失敗時のレスポンスにフォロワー側で計算したインデックスを含めるようにし、nextIndex[peer] = resp.LastLogIndex + 1 と一気にジャンプできるようにしています。フォロワーのログが短い場合はログ末尾のインデックスを、termの不一致がある場合は競合termの手前のインデックスを返します。

matchIndexだけでも正しく動きますが、その場合リーダー昇格直後のmatchIndexは全フォロワーとも0なので、最初の送信で全ログを全フォロワーに送ることになり非効率です。nextIndexを楽観的に最新に設定しておけば、フォロワーがすでに最新の場合は何も送らずに済み、遅れている場合は上記の仕組みで正しい位置まで一気にバックトラックできます。

func (n *RaftNode) broadcastAppendEntries() {
    // ...
    var wg sync.WaitGroup
    for _, peer := range n.peers {
        wg.Add(1)
        go func(peer string) {
            defer wg.Done()

            nextIdx := n.nextIndex[peer]
            entries := n.log.GetRange(nextIdx, n.log.LastIndex())

            resp, err := n.transport.AppendEntries(peer, &AppendEntriesRequest{
                Term:         term,
                LeaderID:     n.id,
                PrevLogIndex: nextIdx - 1,
                PrevLogTerm:  prevLogTerm,
                Entries:      entries,
                LeaderCommit: commitIndex,
            })
            // ...
            if resp.Success {
                if len(entries) > 0 {
                    lastEntry := entries[len(entries)-1]
                    n.nextIndex[peer] = lastEntry.Index + 1
                    n.matchIndex[peer] = lastEntry.Index
                }
            } else {
                n.nextIndex[peer] = resp.LastLogIndex + 1
            }
        }(peer)
    }
    wg.Wait()
    n.advanceCommitIndex()
}

全フォロワーへの送信が終わったら、advanceCommitIndex() でcommitIndexを進めます。各フォロワーのmatchIndexを見て、過半数のノードが持っているインデックスまでcommitIndexを更新します。Raftの核心はこの過半数の判定です。リーダー自身を含めて過半数のノードがあるインデックスのエントリを持っていれば、そのエントリは確定とみなせます。過半数同士は必ず重なりを持つため、たとえリーダーが交代しても、新リーダーは必ずコミット済みエントリを持つノードから選ばれることが保証されます。

func (n *RaftNode) advanceCommitIndex() {
    for idx := n.commitIndex + 1; idx <= n.log.LastIndex(); idx++ {
        entry, _ := n.log.GetEntry(idx)
        if entry.Term != n.currentTerm {
            // 前のTermのエントリを直接コミットしてはいけない(現在のタームでコミットできれば連鎖的にコミットされるのでそれを待つ)
            continue
        }
        count := 1 // リーダー自身
        for _, peer := range n.peers {
            if n.matchIndex[peer] >= idx {
                count++
            }
        }
        if count > (len(n.peers)+1)/2 {
            n.commitIndex = idx
        }
    }
    // commitIndexまでのエントリを実際に適用...
}

entry.Term != n.currentTermの時スキップしてる点については後述します。

フォロワー側のログ取り込み

フォロワーがリーダーからログの複製を受け取ったときの処理です。

func (n *RaftNode) HandleAppendEntries(req *AppendEntriesRequest, resp *AppendEntriesResponse) error {
    // ...
    if req.PrevLogIndex > 0 {
        // リーダーは送ろうとしてるログの直前のログが書き込まれたtermについてフォロワーと認識が揃ってるか確認する
        // 揃ってる: 帰納的に全部OK
        // 揃ってない: リーダーが認識してるログと乖離しているため偽物。リーダーはもっと古いものから送って上書きする。
        entry, ok := n.log.GetEntry(req.PrevLogIndex)
        if !ok {
            // 直前すら存在してない
            resp.LastLogIndex = n.log.LastIndex()
            return nil
        }
        if entry.Term != req.PrevLogTerm {
            // 揃ってないケース。巻き戻す
            conflictTerm := entry.Term
            idx := req.PrevLogIndex
            for idx > 1 {
                prev, ok := n.log.GetEntry(idx - 1)
                if !ok || prev.Term != conflictTerm {
                    break
                }
                idx--
            }
            resp.LastLogIndex = idx - 1
            return nil
        }
    }
    for _, entry := range req.Entries {
        existing, ok := n.log.GetEntry(entry.Index)
        if ok && existing.Term != entry.Term {
            n.log.Truncate(entry.Index)
            n.log.Append(entry)
        } else if !ok {
            n.log.Append(entry)
        }
    }
    // commitIndex更新 → データに適用
    // ...
}

まず一貫性チェックを行います。リーダーは「送信するエントリの直前(PrevLogIndex)にはこのterm(PrevLogTerm)のエントリがあるはず」という情報を一緒に送ってきます。フォロワーは自分のログと突き合わせ、一致していればその地点までログが揃っていることが帰納的に確認できるのでエントリを追記できます。

たとえば旧リーダーがterm=3でindex=5を書き込んだ直後にクラッシュし、そのエントリがフォロワーの一部にだけ残っているとします。その後選出された新リーダー(term=4)がindex=5を送る際、直前のログ(index=4)に含まれるtermを確認します。ここが一致すればindex=4までのログは揃っているので、フォロワーの古いindex=5(term=3)は新リーダーのindex=5(term=4)で上書きされます。

このチェックが食い違うのは、フォロワーが旧リーダーから受け取った未コミットのエントリを複数持っている場合です。もしindex=4のtermが食い違っていれば、リーダーはnextIndexをさらに引き下げて、一致する地点が見つかるまで遡ります。見つかれば、そこから先をリーダーのログで上書きすることでフォロワーのログが正しい状態に修復されます。

一貫性チェックが通れば、受け取ったエントリを追記します。既存のエントリとtermが異なる場合はそこから先をTruncateしてリーダーのログで上書きします。

FSM(Finite State Machine)

Raftにおいて、ログから実際のデータを更新する処理を担うのがFSM(有限状態マシン)です。ステートマシンといえば状態遷移図を思い浮かべますが、ここでの意味もそれと本質的に同じです。データベースの状態を持ち、ログエントリ(コマンド)を受け取ると状態が遷移します。同じログを同じ順序で適用すれば、どのノードでも同じ状態遷移が起き、最終的な状態は一致します。Raftはこのログの合意までを担当し、合意されたログエントリを実データに反映するのがFSMの役割です。

フォロワーがログを受け取った時点では、まだデータへの適用は行われません。フォロワーはログの取り込み後、リーダーから送られてきたcommitIndexと自分のcommitIndexを比較し、進んでいれば新たに確定したエントリをFSMに適用します。

// HandleAppendEntries の後半(ログ取り込みの後)
oldCommitIndex := n.commitIndex
if req.LeaderCommit > n.commitIndex {
    lastNewIndex := n.log.LastIndex()
    if req.LeaderCommit < lastNewIndex {
        // リーダーのコミットより自ノードのログの方が先行
        // 合意確定していないログが溜まっている通常ケース
        n.commitIndex = req.LeaderCommit
    } else {
        // 自ノードのログよりリーダーのコミットの方が先
        // フォロワーが遅れているケース。溜まった分だけコミット
        n.commitIndex = lastNewIndex
    }
}
// 新たに確定したエントリをFSMに適用
for idx := oldCommitIndex + 1; idx <= n.commitIndex; idx++ {
    entry, _ := n.log.GetEntry(idx)
    n.fsm.Apply(entry.Data, false)
}

つまりフォロワーのデータ適用は、ログの受信とは別のタイミングで起きます。リーダーが過半数の複製を確認してcommitIndexを進め、そのcommitIndexがフォロワーに伝わり、そこで初めてFSMに適用されるという2段階の流れです。今回の実装ではcommitIndexの伝播は次のtick(100msごとのハートビート)で行っているため、フォロワーへの適用にはその分のラグがあります。

FSMの適用処理自体はリーダーとフォロワーで異なります。リーダーはトランザクション実行中にすでにバッファ上でデータを変更済みなので、ディスクに書き出すだけです。一方フォロワーは、リーダーのトランザクションコンテキストを持たないため、受け取ったWALレコードを1つずつ順番にディスクに書き出します。

障害シナリオ

実際に起こりうる障害と、その時Raftではどのように復旧するかについて考えてみます。ここからが分散システムの醍醐味であり、考えるのが大変なところです。当然網羅などできないため、思いつくものをいくつか紹介します。

1. リーダーのネットワーク障害

sequenceDiagram
    participant L as Leader (term=3)
    participant F1 as Follower1
    participant F2 as Follower2

    Note over L,F2: Leaderのネットワークが一時的に不調<br/>ハートビートがFollower1, Follower2に届かない

    Note over F1,F2: 選挙タイムアウト
    Note over F1: Candidate(term=4)
    F1->>F2: RequestVote(term=4)
    F2-->>F1: VoteGranted=true
    Note over F1: Leader昇格(term=4)

    Note over L: ネットワーク復旧<br/>まだ自分がLeader(term=3)だと思っている
    L->>F1: AppendEntries(term=3)
    F1-->>L: term=4を返す
    Note over L: term=4 > term=3<br/>自分は古いリーダーだったと認識<br/>Followerに降格

リーダーのネットワークが不調でハートビートが途絶えてしまった状況です。この時フォロワーはリーダー不在と判断し新たなリーダーを選出しますが、旧リーダーはクラッシュしていたわけではないため、ネットワーク復旧後もまだ自分がリーダーだと思っています。しかしハートビートを送った際、レスポンスには送信先ノードの現在のtermが含まれているため、旧リーダーはそれを見て現在のtermは自分のtermより大きいことに気づきます。これにより自分がもう正当なリーダーではないと認識し、フォロワーに降格します。これがtermによる古いリーダーの排除の仕組みです。実装はこの辺り

仲間に話しかけたらもうお前の時代じゃねーよって言われてスンッとリーダーから降りる感じが切ない。

2. 過半数に複製される前にリーダー死亡

sequenceDiagram
    participant C as Client
    participant L as Leader (term=3)
    participant F1 as Follower1
    participant F2 as Follower2

    C->>L: INSERT ...
    Note over L: RaftLog.Append(entry) → index=5
    L->>F1: entries=[index=5]
    Note over L,L: Follower1の応答を待っている間に<br/>Leaderがクラッシュ!
    F1-->>L: (届かない)

    Note over F1,F2: 選挙タイムアウト
    Note over F2: Candidate(term=4)<br/>Follower1のログにindex=5はあるが<br/>Follower2にはない
    Note over F2: Follower2がLeaderに昇格<br/>index=5は過半数に届いていないため<br/>確定せず消える

過半数に複製される前にリーダーがクラッシュした場合、そのエントリは失われる可能性があります。Follower1はindex=5を受け取っていますが、ログに書かれているだけで過半数の合意が取れていないため確定していません。新リーダーFollower2はそれを持っていないため、Follower1のindex=5はフォロワー側のログ取り込みのセクションで説明した通り、新リーダーのログで上書きされます。仮にFollower1がリーダーに選ばれた場合、index=5は自身のログに残りますが未コミットのままなので、将来新たなtermのエントリがコミットされる過程で上書きされ得ます。クライアントにはエラーが返っているので、「成功と言われたのにデータが消えた」という事態にはなりません。

3. 過半数に複製が成功したがクライアントへのレスポンス前にリーダー死亡

sequenceDiagram
    participant C as Client
    participant L as Leader (term=3)
    participant F1 as Follower1
    participant F2 as Follower2

    C->>L: INSERT ...
    Note over L: RaftLog.Append(entry) → index=5
    L->>F1: entries=[index=5]
    L->>F2: entries=[index=5]
    F1-->>L: Success=true
    F2-->>L: Success=true
    Note over L,L: commitIndex更新後に<br/>Leaderがクラッシュ!<br/>クライアントへのレスポンスは届かない

    Note over F1,F2: 選挙タイムアウト
    Note over F1: Candidate(term=4)<br/>index=5を持っている
    F1->>F2: RequestVote
    F2-->>F1: VoteGranted=true
    Note over F1: Leader昇格<br/>index=5は全ノードにあるため確定

エントリは過半数のノードに複製されているため、新リーダーは必ずそのエントリを持つノードから選ばれます。データは失われません。ただしクライアントにはエラーが返るため、「失敗と言われたが実は成功していた」ということが起こりえます。クライアントがリトライすると同じ書き込みが二重に適用される可能性があるため、アプリケーション層でリクエストに一意なIDを付与し、サーバー側で重複を検知して無視する、といった冪等性の仕組みが必要になります。

commitIndex更新前にクラッシュした場合も、更新後にクラッシュした場合も結果は同じです。いずれの場合も、「成功を返したのにデータが消えた」ということは絶対に起きません。これがRaftの最も重要な保証です。

他にも...

他にもRaft論文では様々な障害シナリオが考慮されています。

例えばadvanceCommitIndex のコードで entry.Term != n.currentTerm のエントリをスキップしていたのも、特定の障害に対応するためです。前のtermのエントリが過半数に複製されていても、それだけを根拠にコミットすると、その後別のリーダーが選出されてエントリが上書きされる可能性があるようです。(Raft論文のFigure 8問題として知られています)。

まだ読み解き中なのであまり理解できておらず、そのうち紹介したいです。

実際に動かしてみる

トランザクションの実装にいい感じに組み込み、3台ノードで動かしてみます。起動時にはすべてのノードの情報を与える必要があります。そうでないと過半数などの判定ができません。

# node1
$ MODE=server LISTEN_ADDR=:15432 NODE_ID=node1 NODE_ADDR=:9001 PEERS=:9002,:9003 BASE_DIR=/tmp/dbdata1 go run .
2026/03/23 23:18:22 INFO starting simpledb...
2026/03/23 23:18:22 INFO raft enabled, waiting for leader election... nodeID=node1
2026/03/23 23:18:23 INFO election timeout, becoming candidate id=node1 term=0
2026/03/23 23:18:23 INFO starting election id=node1 term=1
2026/03/23 23:18:23 INFO won election id=node1 term=1 votes=2 # 当選!
2026/03/23 23:18:23 INFO leader elected leader=node1 isLeader=true
2026/03/23 23:18:23 INFO recovering database
2026/03/23 23:18:23 INFO simpledb started dir=/tmp/dbdata1 blockSize=4000 bufferSize=100
2026/03/23 23:18:23 INFO server listening addr=:15432
# node2
$ MODE=server LISTEN_ADDR=:15433 NODE_ID=node2 NODE_ADDR=:9002 PEERS=:9001,:9003 BASE_DIR=/tmp/dbdata2 go run .
2026/03/23 23:18:22 INFO starting simpledb...
2026/03/23 23:18:22 INFO raft enabled, waiting for leader election... nodeID=node2
2026/03/23 23:18:23 INFO leader elected leader=node1 isLeader=false
2026/03/23 23:18:23 INFO waiting for metadata replication from leader...
2026/03/23 23:18:23 INFO follower initialized from replicated metadata
2026/03/23 23:18:23 INFO simpledb started dir=/tmp/dbdata2 blockSize=4000 bufferSize=100
2026/03/23 23:18:23 INFO server listening addr=:15433
# node3
$ MODE=server LISTEN_ADDR=:15434 NODE_ID=node3 NODE_ADDR=:9003 PEERS=:9001,:9002 BASE_DIR=/tmp/dbdata3 go run .
2026/03/23 23:18:23 INFO starting simpledb...
2026/03/23 23:18:23 INFO raft enabled, waiting for leader election... nodeID=node3
2026/03/23 23:18:23 INFO leader elected leader=node1 isLeader=false
2026/03/23 23:18:23 INFO waiting for metadata replication from leader...
2026/03/23 23:18:23 INFO follower initialized from replicated metadata
2026/03/23 23:18:23 INFO simpledb started dir=/tmp/dbdata3 blockSize=4000 bufferSize=100
2026/03/23 23:18:23 INFO server listening addr=:15434

ログを見てみると、node1が選挙に勝利しリーダーになっているのがわかります。

実はPostgresのwireプロトコルを理解できるようにしたので、psqlクライアントからリーダーに接続してみます。

# node1
$ psql -h localhost -p 15432
psql (16.4, server 0.0.0)
WARNING: psql major version 16, server major version 0.0.
         Some psql features might not work.
Type "help" for help.

SQLを実行しデータを投入します。

# node1
user=> CREATE TABLE students (id INT, name VARCHAR(10), class VARCHAR(1));
CREATE TABLE
user=> INSERT INTO students (id, name, class) VALUES (1, "sheep", "A");
INSERT 0 1
user=> INSERT INTO students (id, name, class) VALUES (2, "goat", "B");
INSERT 0 1
user=> INSERT INTO students (id, name, class) VALUES (3, "cow", "B");
INSERT 0 1
user=> INSERT INTO students (id, name, class) VALUES (4, "cat", "C");
INSERT 0 1
user=> SELECT id, name, class FROM students;
 id | name  | class 
----+-------+-------
  1 | sheep | A
  2 | goat  | B
  3 | cow   | B
  4 | cat   | C
(4 rows)

正常に保存されていますね。ではリーダーを止めてみましょう。

# node3
2026/03/23 23:18:23 INFO server listening addr=:15434
2026/03/23 23:30:43 INFO election timeout, becoming candidate id=node3 term=1
2026/03/23 23:30:43 INFO starting election id=node3 term=2
2026/03/23 23:30:43 INFO won election id=node3 term=2 votes=2

node3が選挙に勝ち、新たなリーダーとなりました。termが進んでいるのがわかります。ではnode3に接続してクエリしてみましょう。

# node3
$ psql -h localhost -p 15434
...
user=> SELECT id, name, class FROM students;
 id | name  | class 
----+-------+-------
  1 | sheep | A
  2 | goat  | B
  3 | cow   | B
  4 | cat   | C
(4 rows)

複製されたデータが取れています!

node3はリーダーなので新たなデータを書き込めます。

# node3
user=> INSERT INTO students (id, name, class) VALUES (5, "gorilla", "D");
INSERT 0 1
user=> SELECT id, name, class FROM students;                                                                                                                   id |  name   | class 
----+---------+-------
  1 | sheep   | A
  2 | goat    | B
  3 | cow     | B
  4 | cat     | C
  5 | gorilla | D
(5 rows)

ここでnode1を復活させてみましょう。

# node1
$ MODE=server LISTEN_ADDR=:15432 NODE_ID=node1 NODE_ADDR=:9001 PEERS=:9002,:9003 BASE_DIR=/tmp/dbdata1 go run .
2026/03/23 23:38:05 INFO starting simpledb...
2026/03/23 23:38:05 INFO raft enabled, waiting for leader election... nodeID=node1
2026/03/23 23:38:05 INFO leader elected leader=node3 isLeader=false
2026/03/23 23:38:05 INFO follower initialized from replicated metadata
2026/03/23 23:38:05 INFO simpledb started dir=/tmp/dbdata1 blockSize=4000 bufferSize=100
2026/03/23 23:38:05 INFO server listening addr=:15432

リーダーがnode3であることを認識しています。

今回の実装ではフォロワーのノードにも接続できるようにしています。node1に再び繋いでみましょう。

# node1
$ psql -h localhost -p 15432
...
user=> SELECT id, name, class FROM students;
 id |  name   | class 
----+---------+-------
  1 | sheep   | A
  2 | goat    | B
  3 | cow     | B
  4 | cat     | C
  5 | gorilla | D
(5 rows)

node3に書き込んだid=5の行が読み取れています。ただしフォロワーからの読み取りはRaftを経由しないため、リーダーでのコミット後フォロワーへのcommitIndex伝播までにラグがあり、結果整合的な読み取りになります。これはRaftの制約ではなく今回の実装における設計選択で、リーダー経由の読み取りであれば必ずコミットされたものは即座に読み取れるようになります。

フォロワーで書き込もうとすると次のように失敗します。雑にトレースを出してしまってますが、MOVED node3のようにリーダーが誰かを教えてくれます。これによりクライアントからは使える適当なノードに接続し、MOVEDが返ってきたらそのノードに繋ぎ直してリトライ、といった処理ができます。有名どころで言うとRedis Clusterなどがこのような方式を採用しています。

# node1
user=> INSERT INTO students (id, name, class) VALUES (6, "monkey", "E");
ERROR:  MOVED node3: update scan: move to next available slot in block...(中略)..writes are only allowed on the leader node

ではこの状態でnode1, node2を落としてみます。node3はリーダーのままですが、フォロワーがいなくなってしまいました。

node3でSQLを実行してみます。

# node3
user=> SELECT id, name, class FROM students;
 id |  name   | class 
----+---------+-------
  1 | sheep   | A
  2 | goat    | B
  3 | cow     | B
  4 | cat     | C
  5 | gorilla | D
(5 rows)

問題ないですね。では書き込み。

# node3
user=> INSERT INTO students (id, name, class) VALUES (6, "monkey", "E");
ERROR:  raft apply for transaction 7: failed to replicate to majority

失敗しました。エラーを見ても分かる通り、この状態では過半数のノードへ複製ができず、Raftによって合意が取れないため書き込みが拒絶されます。ここが単純なプライマリ-レプリカの構成と違うところです。ネットワーク分断が起きた場合も同様で、少数派パーティションに属するノードは過半数を確保できないため書き込みを拒否します。このように一貫性を保つ代わりに可用性を犠牲にする振る舞いは、いわゆるCAP定理でいうところのCPに分類されます*1。なお、読み取りはRaftを経由しないため、過半数が死んでいても成功します。

ここで再度node1を復活させてから、もう一度node3でSQLを実行してみます。

# node3
user=> INSERT INTO students (id, name, class) VALUES (6, "monkey", "E");
INSERT 0 1
user=> SELECT id, name, class FROM students;
 id |  name   | class 
----+---------+-------
  1 | sheep   | A
  2 | goat    | B
  3 | cow     | B
  4 | cat     | C
  5 | gorilla | D
  6 | monkey  | E
(6 rows)

今度は成功しました。過半数が生きていれば書き込みが継続できることがわかります。

やってみて気づいたこと

感想パートです。実装を通じて感じたことをいくつか書きます。

リーダーとフォロワーでリカバリの仕組みが違う

設計方針のセクションで述べた通り、リカバリを行う際、リーダーはローカルWALで巻き戻し、フォロワーはRaftログを再適用します。シングルノードのDBでは1種類だったリカバリの仕組みが、分散化によってノードの役割ごとに分かれるというのは、実装して初めて腑に落ちました。

ノード構成は起動時に決め打ち

動かしてみるのセクションで見た通り、ノード構成は起動時に固定で持たせる必要があります。途中でノードが増減すると過半数となるノード数も変わるため、扱いがかなり厄介そうです。Raft論文を見た感じだとJoint Consensusという手法で上手く構成変更をするようです(まだちゃんと読み込めてない)。

エッジケースの山

少し実装しただけでもエッジケースあちこちにあることに気付きました。どのタイミングでノードやネットワークが死んだり生き返ったりするかわからない、というだけで難易度が跳ね上がります。おもちゃのようなsimpledb-goでこれなので、プロダクショングレードのシステムはこんなものでは済まないのでしょう。

理解しやすさ

RaftはPaxosなどの他の分散合意アルゴリズムと比較して理解しやすいという触れ込みがあります。実際にRaft論文では理解しやすさを最大の目標に掲げており、学習のしやすさを検証するため学生に講義ビデオを見せて小テストを実施しています。結果としてPaxosに比べてスコアが良く、アンケートでもRaftの方が理解しやすいと答えた学生が多かったそうです。なんだかソフトウェア工学の論文みたいですね。

しかし理解しやすいというのはあくまでPaxosとの比較であって、実装を書いたりAIに書かせたり、それを読み解いたりするのは普通に難しかったです。今回Raftを触ってみて、これよりさらに難しいといわれるPaxosにも興味が湧きました。

またここまで状況が複雑だと、アルゴリズム自体にバグがあるのではと疑った瞬間が何度かありましたが、RaftはTLA+で形式検証されているそうです。バグっていたのは自分のほうでした。

スケーラビリティの限界

Raftは全ノードが全データを持つため、データ量がノードのストレージを超えるとスケールしません。さらに書き込みを受けるのもリーダー1台だけなので、いずれ書き込み性能も頭打ちになってしまいそうです。

実用的にはMulti-Raftと呼ばれる手法で、データをシャーディングしてシャードごとに独立したRaftグループを持たせることで対処します。TiDBなどがこのアプローチを採用しています。

Redisとの比較

実装途中で、Redis Sentinelのリーダー選挙がRaftの選挙と似ていることに気づきました。ただ、Redisのレプリケーションは非同期なので一貫性は保証されません。フェイルオーバーのタイミングによっては書き込みが消える可能性があります。選挙の仕組みが似ていても、合意の保証レベルはまったく異なっています。

TCPとの共通点

ログ複製でリーダーがフォロワーごとに「どこまで持っているか」を追跡しながら差分を送る仕組みは、以前TCPを自作したときのことを思い出しました。TCPでもシーケンス番号とACKで相手がどこまで受け取ったかを管理しながらデータを送ります。TCPのセッション維持の仕組みも広義には分散システムと言えるので、相手の状態をエンドポイント側で管理しながら少しずつ前に進めていく構造に類似点を見出せますね。

検証の難しさ

今回は簡単な検証で済ませましたが、分散システムの正しさをテストするのは本質的に難しいと感じました。障害の起きるタイミングの組み合わせが膨大で、通常の動作検証では網羅しきれません。FoundationDBが採用している決定論的シミュレーションテストのようなアプローチを試してみたいところです。

Raftの設計の巧みさ

最後に、Raftは非常によくできたアルゴリズムだと感じました。これだけのコード量でノード障害に耐える分散合意が実現できるのは素直に美しいです。本番で使うにはスナップショットやメンバーシップ変更など足りない機能はたくさんありますが、コアの合意形成メカニズムはシンプルかつ堅牢で、実装を通じてその設計の巧みさを実感できました。

以上長々とありがとうございました。ソースコードは https://github.com/teru01/simpledb-go で公開しています。

*1:一部の側面しか切り取ってないため、あまりこれに振り回されてはいけないらしい。https://www.docswell.com/s/kumagi/K24LXG-dreadful-distributed-systems#p9