動機

論選好課、書的重要性

Models of distributed systems

怎麼處理上面會騙人的狀況? 只有兩人無法判斷對錯,所以如果有一個壞人,就要有兩個好人,才能抵銷

剛剛的兩個問題都有兩項東西

  • Node
  • Network

還有另一個要考慮的是 latency!!

  • system model是由下面3項組成

    • Network behaviour (e.g. message loss)
      • (assume) bidirectional point-to-point communication
        • Reliable (perfect) links
          • 如果收到msg,就代表他有被送出
          • 可能重排
            • Fair-loss links + retry&消除重複
        • Fair-loss links
          • msg可能消失、重複、重排
          • 但一直try總會到對面去
            • Arbitrary links + TLS
        • Arbitrary links
          • 中間有不懷好意的人
      • Network partition
        • 線路會drop或是delay
    • Node behaviour (e.g. crashes)
      • 如果出事(faulty)的話
        • Crash-stop (fail-stop)
          • crash就直接停下來
        • Crash-recovery (fail-recovery)
          • crash會失去mem中的狀態
          • 在一段時間之後會自己復原
        • Byzantine (fail-arbitrary)
          • crash後什麼事都有可能發生
    • Timing behaviour (e.g. latency)
      • Node & Network
        • Synchronous
          • latency有上限
          • node的執行速度是可以預估的
        • Partially synchronous
          • 有些部分是async的(會結束,但不知道多久)
          • 其他都是sync
        • Asynchronous
          • latency不確定
          • node也不確定會不會隨時停下
      • Node的意外
        • Operating system scheduling (priority inversion)
        • Stop-the-world garbage collection
        • Page faults, swap, thrashing
      • Network的意外
        • Message loss requiring retry
        • Congestion/contention causing queueing
        • Network/route reconfiguration
  • Failure: system完全不動

  • Fault: 一部份不動了

    • node
      • crash (crash-stop/crash-recovery)
      • deviating from algorithm (Byzantine)
    • Network
      • dropping or significantly delaying messages
  • Failure detectors

    • Perfect failure detector
      • labels a node as faulty iff it has crashed
    • Typical implementation
      • send message
      • await response
      • label node as crashed if no reply within some timeout
    • Problem
      • 不能區分是
        • crashed node
        • temporarily unresponsive node
        • lost message
        • delayed message
      • 所以只能在
        • synchronous crash-stop system with reliable links
        • Eventually perfect failure detector
          • temporarily
            • 標成 crashed, 就算是 correct
            • 標成 correct, 就算是 crashed
          • eventually
            • 標成 crashed, iff crashed

Time, clocks, and ordering of events

  • two types of clock
    • physical clocks
      • 就是時間,數花掉的時間
      • 會有誤差 (可能在調整時變大變小)
        • Coordinated Universal Time (UTC)
          • UTC 是由 TAI 配合地球自轉速度做修正
            • International Atomic Time(TAI): from 原子鐘
            • 地球自轉速度不是常數
        • Leap seconds
          • 在 on 30 June and 31 December at 23:59:59 UTC
            • negative leap second: 直接跳過去
            • positive leap second: 跳到 23:59:60等1秒,再跳過去
        • timestamps格式
          • Unix time: number of seconds since 1 January 1970 00:00:00 UTC
            • 不算Leap seconds
          • ISO 8601: year, month, day, hour, minute, second, and timezone offset relative to UTC
            • 2020-11-09T09:50:17+00:00
        • NTP
          • Clock skew: 兩個clock的差
          • 作法
            • 對多台server取樣,跑算式

            • 開始調時間

            • Time-of-day clock
              • 從某個時間點開始
              • 適合比較
              • 可能會變動 (NTP矯正)
            • Monotonic clock
              • 隨便一個點開始
              • 適合計算花費的時間
    • logical clocks
      • 用來數有多少事件
      • 單調遞增
      • happens-before relation (就是有向圖的path)
        • 定義
          • 同一個process
            • a發生在b之前
          • 不同process
            • a送msg,b收到
          • 遞移律
            • a -> b -> c: a -> c
        • 就是有向圖的path
          • partial order
        • concurrent: 不是 a -> b 或 b -> a
          • a || b
      • Causality
        • concurrent代表兩者一定沒有因果關係
        • 有hb就是可能有因果關係
        • 定義
          • a strict total order on events

Broadcast protocols and logical time

  • capture causal dependencies

    • No Physical timestamps!!
    • two types of logical clocks
      • Lamport clocks
        • 作法
          • 每個process init自己的時間
            • t = 0
          • 有event發生
            • t++
          • send某個m
            • t++
            • send((t, m))
          • recv到某個(tx, m)
            • t = max(tx, t) + 1
            • 對m做事
        • Properties
          • L是取t的函數、N是取在哪個process的函數
        • the pair (L(e), N(e)) uniquely identifies event e
        • 用Lamport clocks定義total order
        • BUT
          • 不同process的timestamp可能會一樣!!
            • 不能分辨 a -> b 或是 a || b
              • 因為把自己與其他人的時間混在一起了
      • Vector clocks
        • 作法
          • 每個process(假設叫i) init自己的時間
            • t = [0] * len(procs)
          • 有event發生
            • t[i]++
          • send某個m
            • t[i]++
            • send((t, m))
          • recv到某個(tx, m)
            • t = [max(t[j], tx[j]) if j != i else t[i]+1 for j in range(len(procs))]
            • 對m做事
        • 用Vector clocks定義total order
  • Broadcast protocols

    • broadcast的種類
      • FIFO
        • 從同一個process出來的msg,收到msg的順序,與送的順序一樣
          • vaild order: (m1一定在m3前面,其他隨便)
            • (m2, m1, m3)
            • (m1, m2, m3)
            • (m1, m3, m2)
      • Causal
        • broadcast(m1) -> broadcast(m2),那m1一定先收到,之後才是m2
          • vaild order: (m1 -> m2,m1 -> m3)
            • (m1, m2, m3)
            • (m1, m3, m2)
      • Total order
        • 只要m1先收到,在所有process都會是m1先收到
          • vaild order:
            • (m1, m2, m3)
          • vaild order:
            • (m1, m3, m2)
      • FIFO-total order
        • FIFO + Total order
    • Broadcast algorithms
      • 要處理兩個部分
        • 把best-effort broadcast變成reliable
          • 利用retransmitting
        • 保持送(收)的順序
      • Not Reliable (Naive)
        • 直接送到process去
        • Problem
          • 送的process中間掛掉怎麼辦
      • Reliable
        • Eager reliable broadcast
          • process只要是第一次收到時就re-boardcast!!
            • 到所有process
          • Problem
            • 總共有O(n^2)個msg在流動!!
        • Gossip protocols
          • process只要是第一次收到時就re-boardcast!!
            • 到3個process (隨機選)
          • 適合用在process很多時
          • Problem
            • Eventually reaches all nodes (with high probability)
    • FIFO broadcast algorithm
      • 重點是在buffer中找有對到自己的delivered的msg才做下一步動作
        • 這樣就是FIFO,從同一個process出來的msg的順序是對的
    • Causal broadcast algorithm
      • 延伸FIFO broadcast algorithm,但
        • 改成vector clock!! (這樣就完成CO了!!)
    • Total order broadcast algorithms
      1. Single leader (轉成BFS tree)
        • 會有單點失敗!!
      2. Lamport clocks (用Lamport clocks排序)
        • 怎麼確定我現在拿到的msg是最小的?
          • 要等到每個process的msg的stamp都比較大才會知道

Replication

  • Use best-effort broadcast
    • Idempotence

      • 多次操作後不用dedup的操作
      • retry behaviour
        • At-most-once
          • 不retry
        • At-least-once
          • Retry到收到ack
        • Exactly-once
          • Retry +
            • idempotence
            • deduplication
      • 就算是idempotence,還是有影響到狀態
        • 所以下面的client2到了最後沒辦法看到移除了client1 add的資料的狀態
    • Timestamps and tombstones (soft delete)

      • 資料會放一個flag,標有沒有被刪過

      • Reconciling replicas

        • 還會放一個timestamp標什麼時候被寫入
      • Concurrent writes by different clients

        • 兩種做法
          • Last writer wins
            • 取timestamp最大的
              • total order (e.g. Lamport clock)
            • 注意 data loss
          • Multi-value register
            • 如果可以比,取最大;不能比,都存
              • partial order (e.g. vector clock)
    • Quorum (Byzantine problem)

      • Read-after-write consistency
      • 只要read(read quorum)/write(write quorum)的response有到達指定人數就取這個結果
        • read quorum + write quorum > nodes
          • 一般取,(nodes+1)/2
          • 可以在write時忍受nodes-write quorum壞掉,read是nodes-read quorum
  • Use Total order broadcast (在每個process中msg的順序都是一樣的!!)
    • State machine replication
      • FIFO total order broadcast送update msg: 一定會到、順序一樣
        • 能不能用更弱的broadcast
          • 這樣就沒有順序的保證了!!
            • 但是update的順序如果不影響最後的結果的話
              • commutative: f(g(x)) = g(f(x))
      • 所以可以把update msg當成map,Replica當成DFA
        • same input, same output: deterministic
      • 限制
        • 不能馬上更新,要等msg傳遞
        • 需要 fault-tolerant total order broadcast

Consensus

  • Fault-tolerant total order broadcast
    • total order broadcast一定要leader!!
      • leader壞了怎麼辦?
        • 自己選一個
          • 用failure detector (timeout)看leader壞了沒
            • 壞了就選下一個
            • 確保只有一個leader
              • 用term區份這個任期中誰是該區市民與leader
                • 需要定義Quorum (過 半+1)
                • 每個process在每個任期中最多只能投一次票
              • 這樣可以確保一個任期只有一個leader
                • leader在傳msg之前都要ack,不然怕場面尷尬
    • Consensus and total order broadcast
      • Consensus: 大家都同意某一個值
        • total order broadcast: 大家都同意下一個msg要送什麼
      • Consensus and total order broadcast are formally equivalent
    • Common consensus algorithms: Paxos, Multi-Paxos, Raft
  • Distributed mutual exclusion
    • 作法
      • central lock server
        • leader是bottleneck
        • 單點失敗
          • 怎麼重選leader
      • token passing
        • 用一個token去傳(整個要連成一個ring),拿到就當成拿到lock
        • 單點失敗
          • 怎麼rebuild ring
          • 怎麼重生token
      • Totally ordered multicast
        • 只有一個人held,所以讓所有人投票,要拿到N-1
          • raft是(N+1)/2
        • concurrent requests (兩個以上want lock)
          • 看pid比大小
            • 哲學家用餐問題!!
  • Consensus system models
    • 假設system model是
      • partially synchronous
        • not asynchronous (FLP result)
          • in an asynchronous crash-stop system model
            • no deterministic consensus algorithm that is guaranteed to terminate
        • use clocks only used
          • for timeouts/failure detector
            • to ensure progress
          • not Safety (correctness)
      • crash-recovery
  • Raft
    • state(腳色) 變化
    • 元件
      • log: leader傳過的msg 或是 follower收到的msg (array)
        • msg
        • term: 傳msg當時的term
      • term: 任期
        • 主要是看這個
      • sentLength: leader傳了多長的log給follower
      • ackedLength: follower回報他們的log多長
      • commitLength: 真的有deliver的有多少
class Node:
    def __init__(self):
        self.init_runtime_state()
        self.Term = 0
        self.votedFor = None
        self.log = []
        self.commitLength = 0
        self.id = "whatever"
        
    def init_runtime_state(self):
        self.Role = "follower"
        self.Leader = None
        self.votesReceived = set()
        self.sentLength = [0]*len(nodes)
        self.ackedLength = [0]*len(nodes)

    def when_leader_fail_OR_election_timeout(self):
        self.Term += 1
        self.Role = "candidate"
        self.votedFor = self.id
        self.votesReceived.add(self.id)
        lastTerm = self.log[-1].term if len(self.log) > 0 else 0
        for n in nodes:
            __send(n, ("VoteRequest", self.id, self.Term, len(self.log), lastTerm))
        __startElectionTimer()
    
    def when_recv_VoteRequest(self, cId, cTerm, CLogLen, cLogTerm):
        myLogTerm = self.log[-1].term
        
        isLargeLogTerm = cLogTerm > myLogTerm
        inSameLogTerm = myLogTerm == myLogTerm
        hasMoreLog = CLogLen >= len(self.log)
        logOK = isLargeLogTerm or (inSameLogTerm and hasMoreLog)
        
        isLargeTerm = cTerm > self.Term
        inSameTerm = cTerm == self.Term
        notVoted_OR_voteSame = self.votedFor in {cId, None}
        termOK = isLargeTerm or (inSameTerm and notVoted_OR_voteSame)
        
        if logOK and termOK:
            self.Term, self.Role, self.votedFor = cTerm, "follower", cId
            __send(cId, ("VoteResponse", self.id, self.Term, True))
        else:
            __send(cId, ("VoteResponse", self.id, self.Term, False))
    
    def when_recv_VoteResponse(self, vId, vTerm, isAgree):
        if self.Role == "candidate" and self.Term == vTerm and isAgree:
            self.votesReceived.add(vId)
            if len(self.votesReceived) >= (len(nodes)+1)//2:
                self.Role, self.Leader = "leader", self.id
                __stopElectionTimer()
                for n in {n in nodes if n is not self}:
                    self.sentLength[n], self.ackedLength[n] = len(self.log), 0
                    self.copyLogTo(n)
        elif vTerm > self.term:
            self.Role, self.Term, self.votedFor = "follower", vTerm, None
            __stopElectionTimer()

    def broadcast(self, msg):
        if self.Role == "leader":
            self.log.append((msg, self.Term))
            self.ackedLength[self.id] = len(self.log)
            for n in {n in nodes if n is not self}:
                self.copyLogTo(n)
        else:
             __forwarding_to_leader(msg)
    def periodically_do(self):
        if self.Role == "leader":
            for n in {n in nodes if n is not self}:
                self.copyLogTo(n)
    def copyLogTo(self, n):
        i = self.sentLength[n]
        diffLogs = self.log[i:]
        prevFollowerTerm = self.log[max(0, i-1)]
        __send(n, ("LogRequest", self.id, self.Term, i, prevFollowerTerm, self.commitLength, diffLogs))
    
    def when_recv_LogRequest(self, lId, lTerm, followerLogStart, followerLogTerm, lCommitLen, diffLogs):
        isLargeTerm = lTerm > self.Term
        isCandidate = lTerm == self.Term and self.Role == "candidate"
        if isLargeTerm or isCandidate:
            self.Role, self.Leader = "follower", lId
            if isCandidate:
                self.Term, self.votedFor = lTerm, None
        
        largerLog = len(self.log) >= followerLogStart # follower不能比較大,不然就沒有更新的意義了
        isFreshStart = followerLogStart == 0
        isLogStartInSameTerm = followerLogTerm == self.log[followerLogStart-1].term
        logOK = largerLog and (isFreshStart or isLogStartInSameTerm)
        
        if self.Term == lTerm and logOK:
            self.patchDiff(followerLogStart, lCommitLen, diffLogs)
            ack = len(diffLogs) + followerLogStart
            __send(lId, ("LogResponse", self.id, self.Term, ack, True))
        else:
            __send(lId, ("LogResponse", self.id, self.Term, 0, False))
    def patchDiff(self, start, lCommitLen, diff):
        #shrink log
        if diff and len(self.log) > start and self.log[start].term != diff[0].term:
            self.log = self.log[:start]
        
        if start+len(diff) > len(self.log):
            self.log += diff[len(self.log)-start:]
        
        if lCommitLen > self.commitLength:
            for msg,_ in self.log[self.commitLength:lCommitLen]:
                __deliver(msg)
            self.commitLength = lCommitLen
    
    def when_recv_LogResponse(self, fId, fTerm, ack, good):
        if self.Term == fTerm and self.Role == "leader":
            if good and ack >= self.ackedLength[fId]:
                self.sentLength[fId] = self.ackedLength[Utils, CId] = ack
                self.commitLogEntries()
            elif self.sentLength[fId] > 0:
                # 太長啦
                self.sentLength[fId] -= 1
                self.copyLogTo(fId)
        elif fTerm > self.Term:
            self.Term, self.Role, self.votedFor = fTerm, "follower", None
    
    def commitLogEntries(self):
        acks = lambda l: len(n for n in nodes if self.ackedLength[n] >= l)
        newCommitLen = max({l for l in range(1,len(self.log+1)) if acks(l) >= (len(nodes)+1)//2}, default=-1)
        
        if newCommitLen > self.commitLength and self.log[newCommitLen-1].term == self.Term:
            for msg,_ in self.log[self.commitLength:newCommitLen]:
                __deliver(msg)
            self.commitLength = newCommitLen

Replica consistency

  • 各種情境下的Consistency
    • ACID
      • DB在跑完transaction後會從consistent state到另一個consistent state
        • consistent: satisfying application-specific invariants
    • Read-after-write consistency
    • Replication: 每個replica都要consistent
      • 同樣狀態? 從什麼時候開始算
      • read都要return一樣的結果
  • Atomic commit
    • ACID的transaction是
      • either commits or aborts
        • commit: 是持久的(後面都看的到)
        • abort: 沒有可見的side-effect
      • 所以如果很多DB,也是either commits or aborts
    • Two-phase commit
        • 如果process在等coordinator回commit或是abort之前掛了?
          • 就是等coordinator回來
      • Fault-tolerant two-phase commit
        • 就是傳commit時會帶所有有關的replica
          • 這樣只要有人發現在有關的replica掛了就可以發abort
            • total order broadcast
        • 之後就是等return ok
          • 都ok,commit
          • 出事,abort
  • Linearizability (strong consistency)
    • 多node的atomic operation
      • 每個operation的return都是最新的結果
    • not happens-before
      • set之後的get都要能看到set的結果 (Linearizability)
      • client1與client2沒有send/recv (not happens-before)
        • 所以不能用Lamport clocks!!
        • 只能用phy clock!!
      • Operations overlapping in time
        • order沒差,這裡的重點是看的到
    • Serializability & Linearizability
      • Linearizability: 都拿到最新的結果 (cache coherense)
      • Serializability: 多個transaction同時跑就像是transaction按照某個順序去跑 (mem model)
      • client2與client3拿到的結果不對 (not Linearizability)!!
      • 手法
        • get的linearizability
          • quorum read
        • set的linearizability
          • blind write to quorum
    • Linearizable compare-and-swap (CAS)
      • total order broadcast
    • advantages
      • 分散式不像分散式
      • 使用上就變簡單了
    • Downsides
      • Performance: 很多msg與一直在等
      • Scalability: 需要leader
      • Availability: 連不到quorum什麼事都不用做
  • Eventual consistency
    • The CAP theorem
      • 在網路會gg的情況下 (network Partition),只能保證一個
        • Consistent (linearizable)
        • Available
          • 不等,直接傳自己的舊資料
    • 只要沒有進一步的update,所有replica都會變成一樣的state
    • Strong eventual consistency
      • Convergence: 只要是同一個state跑同一集合的update(order沒差),最後會是一樣的狀態
      • Eventual delivery: 只要有人被update到,最後所有人都會被update到
    • Properties
      • 不用等
      • 只要Causal broadcast或以下就可以update
      • Concurrent updates => 只要能處理conflict就沒事

Concurrency control in applications (152)

  • Conflicts due to concurrent updates

    • 解法
      • Conflict-free Replicated Data Types (CRDTs)

        • 就是有timestamp的dict
        • 作法
          • Operation-based
            • 傳的是action (set)
              • reliable broadcast (一定要到,但可以是任何順序)
                • action(set)一定要commutative
            • typically has smaller messages
            • 例子: Operation-based text CRDT
              • init & read
                  • elementAt就是array[i],實作在set上
              • 有一段區間,insert就是二分
              • delete就是把tuple從state中去掉
              • causal broadcast
                • insert要在delete之前先到
          • State-based
            • 傳的是state (整個values)
              • best-effort broadcast (不到沒關係)
                • 原本reliable確保一定會到,但現在沒有
                  • Idempotent
                • 剩下就是原本reliable的事
                  • Commutative
                  • 不過現在一次多個 (opration based是一次兩個)
                    • Associative
            • can tolerate message loss/duplication
      • Operational Transformation (OT)

        • 把operation記錄下來,之後需要重組可以重新組合
  • Consistent snapshots

    • 前面做的事(包含transcation),後面(包含transcation)看的到
      • consistent with causality
        • transcation都要consistent with causality
          • linearizability depends on real-time order
              • 把誤差補上
    • 作法: multi-version concurrency control (MVCC)
      • 每一次write都會產生新的版本與相對應的timestamp
      • read-only transcation就是一個時間
        • read時就是取比自己早且最靠近自己的資料
  • snapshot of system-wide state

    • Consistent cuts
      • 區分event的cut
        • 在cut中的event都是happens-before
          • 除了起點:-),所以是consistent
      • 也就是,在左手邊的event只有兩種
        • send
        • recv,同時他的send也在cut(左手邊)中
        • f只有recv,沒有send在cut中,就不是consistent
    • 收集snapshot of system-wide state
      • 把Consistent cut推到的地方才收集local state
        • local state之後可以集合起來變成global state
      • 怎麼代表cut?
        • 用marker msg以causal order去發
          • 收到marker就收集state
            • 但是當初收到marker的channel不用
              • 因為前一個process已經收集過了

Ref

Linearizability versus Serializability Distributed Systems (好地方,如果slide不懂還有lecture note) Lecture 13: Vector clocks, consistent cuts, process groups, and distributed mutual exclusion