動機
論選好課、書的重要性
Models of distributed systems
怎麼處理上面會騙人的狀況? 只有兩人無法判斷對錯,所以如果有一個壞人,就要有兩個好人,才能抵銷
剛剛的兩個問題都有兩項東西
- Node
- Network
還有另一個要考慮的是 latency!!
system model是由下面3項組成
- Network behaviour (e.g. message loss)
- (assume) bidirectional point-to-point communication
- Reliable (perfect) links
- 如果收到msg,就代表他有被送出
- 可能重排
- Fair-loss links + retry&消除重複
- Fair-loss links
- msg可能消失、重複、重排
- 但一直try總會到對面去
- Arbitrary links + TLS
- Arbitrary links
- 中間有不懷好意的人
- Reliable (perfect) links
- Network partition
- 線路會drop或是delay
- (assume) bidirectional point-to-point communication
- Node behaviour (e.g. crashes)
- 如果出事(faulty)的話
- Crash-stop (fail-stop)
- crash就直接停下來
- Crash-recovery (fail-recovery)
- crash會失去mem中的狀態
- 在一段時間之後會自己復原
- Byzantine (fail-arbitrary)
- crash後什麼事都有可能發生
- Crash-stop (fail-stop)
- 如果出事(faulty)的話
- Timing behaviour (e.g. latency)
- Node & Network
- Synchronous
- latency有上限
- node的執行速度是可以預估的
- Partially synchronous
- 有些部分是async的(會結束,但不知道多久)
- 其他都是sync
- Asynchronous
- latency不確定
- node也不確定會不會隨時停下
- Synchronous
- Node的意外
- Operating system scheduling (priority inversion)
- Stop-the-world garbage collection
- Page faults, swap, thrashing
- Network的意外
- Message loss requiring retry
- Congestion/contention causing queueing
- Network/route reconfiguration
- Node & Network
- Network behaviour (e.g. message loss)
Failure: system完全不動
Fault: 一部份不動了
- node
- crash (crash-stop/crash-recovery)
- deviating from algorithm (Byzantine)
- Network
- dropping or significantly delaying messages
- node
Failure detectors
- Perfect failure detector
- labels a node as faulty iff it has crashed
- Typical implementation
- send message
- await response
- label node as crashed if no reply within some timeout
- Problem
- 不能區分是
- crashed node
- temporarily unresponsive node
- lost message
- delayed message
- 所以只能在
- synchronous crash-stop system with reliable links
- Eventually perfect failure detector
- temporarily
- 標成 crashed, 就算是 correct
- 標成 correct, 就算是 crashed
- eventually
- 標成 crashed, iff crashed
- temporarily
- 不能區分是
- Perfect failure detector
Time, clocks, and ordering of events
- two types of clock
- physical clocks
- 就是時間,數花掉的時間
- 會有誤差 (可能在調整時變大變小)
- Coordinated Universal Time (UTC)
- UTC 是由 TAI 配合地球自轉速度做修正
- International Atomic Time(TAI): from 原子鐘
- 地球自轉速度不是常數
- UTC 是由 TAI 配合地球自轉速度做修正
- Leap seconds
- 在 on 30 June and 31 December at 23:59:59 UTC
- negative leap second: 直接跳過去
- positive leap second: 跳到 23:59:60等1秒,再跳過去
- 在 on 30 June and 31 December at 23:59:59 UTC
- timestamps格式
- Unix time: number of seconds since 1 January 1970 00:00:00 UTC
- 不算Leap seconds
- ISO 8601: year, month, day, hour, minute, second, and timezone offset relative to UTC
- 2020-11-09T09:50:17+00:00
- Unix time: number of seconds since 1 January 1970 00:00:00 UTC
- NTP
- Clock skew: 兩個clock的差
- 作法
對多台server取樣,跑算式
開始調時間
- Time-of-day clock
- 從某個時間點開始
- 適合比較
- 可能會變動 (NTP矯正)
- Monotonic clock
- 隨便一個點開始
- 適合計算花費的時間
- Time-of-day clock
- Coordinated Universal Time (UTC)
- logical clocks
- 用來數有多少事件
- 單調遞增
- happens-before relation (就是有向圖的path)
- 定義
- 同一個process
- a發生在b之前
- 不同process
- a送msg,b收到
- 遞移律
- a -> b -> c: a -> c
- 同一個process
- 就是有向圖的path
- partial order
- concurrent: 不是 a -> b 或 b -> a
- a || b
- 定義
- Causality
- concurrent代表兩者一定沒有因果關係
- 有hb就是可能有因果關係
- 定義
- a strict total order on events
- physical clocks
Broadcast protocols and logical time
capture causal dependencies
- No Physical timestamps!!
- two types of logical clocks
- Lamport clocks
- 作法
- 每個process init自己的時間
t = 0
- 有event發生
t++
- send某個m
t++
send((t, m))
- recv到某個(tx, m)
t = max(tx, t) + 1
- 對m做事
- 每個process init自己的時間
- Properties
- L是取t的函數、N是取在哪個process的函數
- the pair (L(e), N(e)) uniquely identifies event e
- 用Lamport clocks定義total order
- BUT
- 不同process的timestamp可能會一樣!!
- 不能分辨 a -> b 或是 a || b
- 因為把自己與其他人的時間混在一起了
- 不能分辨 a -> b 或是 a || b
- 不同process的timestamp可能會一樣!!
- 作法
- Vector clocks
- 作法
- 每個process(假設叫i) init自己的時間
t = [0] * len(procs)
- 有event發生
t[i]++
- send某個m
t[i]++
send((t, m))
- recv到某個(tx, m)
t = [max(t[j], tx[j]) if j != i else t[i]+1 for j in range(len(procs))]
- 對m做事
- 每個process(假設叫i) init自己的時間
- 用Vector clocks定義total order
- 作法
- Lamport clocks
Broadcast protocols
- broadcast的種類
- FIFO
- 從同一個process出來的msg,收到msg的順序,與送的順序一樣
- vaild order: (m1一定在m3前面,其他隨便)
- (m2, m1, m3)
- (m1, m2, m3)
- (m1, m3, m2)
- vaild order: (m1一定在m3前面,其他隨便)
- Causal
- broadcast(m1) -> broadcast(m2),那m1一定先收到,之後才是m2
- vaild order: (m1 -> m2,m1 -> m3)
- (m1, m2, m3)
- (m1, m3, m2)
- vaild order: (m1 -> m2,m1 -> m3)
- Total order
- 只要m1先收到,在所有process都會是m1先收到
- vaild order:
- (m1, m2, m3)
- vaild order:
- vaild order:
- (m1, m3, m2)
- vaild order:
- FIFO-total order
- FIFO + Total order
- FIFO
- Broadcast algorithms
- 要處理兩個部分
- 把best-effort broadcast變成reliable
- 利用retransmitting
- 保持送(收)的順序
- 把best-effort broadcast變成reliable
- Not Reliable (Naive)
- 直接送到process去
- Problem
- 送的process中間掛掉怎麼辦
- Reliable
- Eager reliable broadcast
- process只要是第一次收到時就re-boardcast!!
- 到所有process
- Problem
- 總共有O(n^2)個msg在流動!!
- process只要是第一次收到時就re-boardcast!!
- Gossip protocols
- process只要是第一次收到時就re-boardcast!!
- 到3個process (隨機選)
- 適合用在process很多時
- Problem
- Eventually reaches all nodes (with high probability)
- process只要是第一次收到時就re-boardcast!!
- Eager reliable broadcast
- 要處理兩個部分
- FIFO broadcast algorithm
- 重點是在buffer中找有對到自己的delivered的msg才做下一步動作
- 這樣就是FIFO,從同一個process出來的msg的順序是對的
- 重點是在buffer中找有對到自己的delivered的msg才做下一步動作
- Causal broadcast algorithm
- 延伸FIFO broadcast algorithm,但
- 改成vector clock!! (這樣就完成CO了!!)
- 延伸FIFO broadcast algorithm,但
- Total order broadcast algorithms
- Single leader (轉成BFS tree)
- 會有單點失敗!!
- Lamport clocks (用Lamport clocks排序)
- 怎麼確定我現在拿到的msg是最小的?
- 要等到每個process的msg的stamp都比較大才會知道
- 怎麼確定我現在拿到的msg是最小的?
- Single leader (轉成BFS tree)
- broadcast的種類
Replication
- Use best-effort broadcast
Idempotence
- 多次操作後不用dedup的操作
- retry behaviour
- At-most-once
- 不retry
- At-least-once
- Retry到收到ack
- Exactly-once
- Retry +
- idempotence
- deduplication
- Retry +
- At-most-once
- 就算是idempotence,還是有影響到狀態
- 所以下面的client2到了最後沒辦法看到移除了client1 add的資料的狀態
Timestamps and tombstones (soft delete)
資料會放一個flag,標有沒有被刪過
Reconciling replicas
- 還會放一個timestamp標什麼時候被寫入
Concurrent writes by different clients
- 兩種做法
- Last writer wins
- 取timestamp最大的
- total order (e.g. Lamport clock)
- 注意 data loss
- 取timestamp最大的
- Multi-value register
- 如果可以比,取最大;不能比,都存
- partial order (e.g. vector clock)
- 如果可以比,取最大;不能比,都存
- Last writer wins
- 兩種做法
Quorum (Byzantine problem)
- Read-after-write consistency
- 只要read(read quorum)/write(write quorum)的response有到達指定人數就取這個結果
read quorum + write quorum > nodes
- 一般取,
(nodes+1)/2
- 可以在write時忍受
nodes-write quorum
壞掉,read是nodes-read quorum
- 一般取,
- Read-after-write consistency
- Use Total order broadcast (在每個process中msg的順序都是一樣的!!)
- State machine replication
- FIFO total order broadcast送update msg: 一定會到、順序一樣
- 能不能用更弱的broadcast
- 這樣就沒有順序的保證了!!
- 但是update的順序如果不影響最後的結果的話
- commutative:
f(g(x)) = g(f(x))
- commutative:
- 但是update的順序如果不影響最後的結果的話
- 這樣就沒有順序的保證了!!
- 能不能用更弱的broadcast
- 所以可以把update msg當成map,Replica當成DFA
- same input, same output: deterministic
- 限制
- 不能馬上更新,要等msg傳遞
- 需要 fault-tolerant total order broadcast
- FIFO total order broadcast送update msg: 一定會到、順序一樣
- State machine replication
Consensus
- Fault-tolerant total order broadcast
- total order broadcast一定要leader!!
- leader壞了怎麼辦?
- 自己選一個
- 用failure detector (timeout)看leader壞了沒
- 壞了就選下一個
- 確保只有一個leader
- 用term區份這個任期中誰是該區市民與leader
- 需要定義Quorum (過 半+1)
- 每個process在每個任期中最多只能投一次票
- 這樣可以確保一個任期只有一個leader
- leader在傳msg之前都要ack,不然怕場面尷尬
- 用term區份這個任期中誰是該區市民與leader
- 用failure detector (timeout)看leader壞了沒
- 自己選一個
- leader壞了怎麼辦?
- Consensus and total order broadcast
- Consensus: 大家都同意某一個值
- total order broadcast: 大家都同意下一個msg要送什麼
- Consensus and total order broadcast are formally equivalent
- Consensus: 大家都同意某一個值
- Common consensus algorithms: Paxos, Multi-Paxos, Raft
- total order broadcast一定要leader!!
- Distributed mutual exclusion
- 作法
- central lock server
- leader是bottleneck
- 單點失敗
- 怎麼重選leader
- token passing
- 用一個token去傳(整個要連成一個ring),拿到就當成拿到lock
- 單點失敗
- 怎麼rebuild ring
- 怎麼重生token
- Totally ordered multicast
- 只有一個人held,所以讓所有人投票,要拿到N-1
- raft是(N+1)/2
- concurrent requests (兩個以上want lock)
- 看pid比大小
- 哲學家用餐問題!!
- 看pid比大小
- 只有一個人held,所以讓所有人投票,要拿到N-1
- central lock server
- 作法
- Consensus system models
- 假設system model是
- partially synchronous
- not asynchronous (FLP result)
- in an asynchronous crash-stop system model
- no deterministic consensus algorithm that is guaranteed to terminate
- in an asynchronous crash-stop system model
- use clocks only used
- for timeouts/failure detector
- to ensure progress
- not Safety (correctness)
- for timeouts/failure detector
- not asynchronous (FLP result)
- crash-recovery
- partially synchronous
- 假設system model是
- Raft
- state(腳色) 變化
- 元件
- log: leader傳過的msg 或是 follower收到的msg (array)
- msg
- term: 傳msg當時的term
- term: 任期
- 主要是看這個
- sentLength: leader傳了多長的log給follower
- ackedLength: follower回報他們的log多長
- commitLength: 真的有deliver的有多少
- log: leader傳過的msg 或是 follower收到的msg (array)
- state(腳色) 變化
class Node:
def __init__(self):
self.init_runtime_state()
self.Term = 0
self.votedFor = None
self.log = []
self.commitLength = 0
self.id = "whatever"
def init_runtime_state(self):
self.Role = "follower"
self.Leader = None
self.votesReceived = set()
self.sentLength = [0]*len(nodes)
self.ackedLength = [0]*len(nodes)
def when_leader_fail_OR_election_timeout(self):
self.Term += 1
self.Role = "candidate"
self.votedFor = self.id
self.votesReceived.add(self.id)
lastTerm = self.log[-1].term if len(self.log) > 0 else 0
for n in nodes:
__send(n, ("VoteRequest", self.id, self.Term, len(self.log), lastTerm))
__startElectionTimer()
def when_recv_VoteRequest(self, cId, cTerm, CLogLen, cLogTerm):
myLogTerm = self.log[-1].term
isLargeLogTerm = cLogTerm > myLogTerm
inSameLogTerm = myLogTerm == myLogTerm
hasMoreLog = CLogLen >= len(self.log)
logOK = isLargeLogTerm or (inSameLogTerm and hasMoreLog)
isLargeTerm = cTerm > self.Term
inSameTerm = cTerm == self.Term
notVoted_OR_voteSame = self.votedFor in {cId, None}
termOK = isLargeTerm or (inSameTerm and notVoted_OR_voteSame)
if logOK and termOK:
self.Term, self.Role, self.votedFor = cTerm, "follower", cId
__send(cId, ("VoteResponse", self.id, self.Term, True))
else:
__send(cId, ("VoteResponse", self.id, self.Term, False))
def when_recv_VoteResponse(self, vId, vTerm, isAgree):
if self.Role == "candidate" and self.Term == vTerm and isAgree:
self.votesReceived.add(vId)
if len(self.votesReceived) >= (len(nodes)+1)//2:
self.Role, self.Leader = "leader", self.id
__stopElectionTimer()
for n in {n in nodes if n is not self}:
self.sentLength[n], self.ackedLength[n] = len(self.log), 0
self.copyLogTo(n)
elif vTerm > self.term:
self.Role, self.Term, self.votedFor = "follower", vTerm, None
__stopElectionTimer()
def broadcast(self, msg):
if self.Role == "leader":
self.log.append((msg, self.Term))
self.ackedLength[self.id] = len(self.log)
for n in {n in nodes if n is not self}:
self.copyLogTo(n)
else:
__forwarding_to_leader(msg)
def periodically_do(self):
if self.Role == "leader":
for n in {n in nodes if n is not self}:
self.copyLogTo(n)
def copyLogTo(self, n):
i = self.sentLength[n]
diffLogs = self.log[i:]
prevFollowerTerm = self.log[max(0, i-1)]
__send(n, ("LogRequest", self.id, self.Term, i, prevFollowerTerm, self.commitLength, diffLogs))
def when_recv_LogRequest(self, lId, lTerm, followerLogStart, followerLogTerm, lCommitLen, diffLogs):
isLargeTerm = lTerm > self.Term
isCandidate = lTerm == self.Term and self.Role == "candidate"
if isLargeTerm or isCandidate:
self.Role, self.Leader = "follower", lId
if isCandidate:
self.Term, self.votedFor = lTerm, None
largerLog = len(self.log) >= followerLogStart # follower不能比較大,不然就沒有更新的意義了
isFreshStart = followerLogStart == 0
isLogStartInSameTerm = followerLogTerm == self.log[followerLogStart-1].term
logOK = largerLog and (isFreshStart or isLogStartInSameTerm)
if self.Term == lTerm and logOK:
self.patchDiff(followerLogStart, lCommitLen, diffLogs)
ack = len(diffLogs) + followerLogStart
__send(lId, ("LogResponse", self.id, self.Term, ack, True))
else:
__send(lId, ("LogResponse", self.id, self.Term, 0, False))
def patchDiff(self, start, lCommitLen, diff):
#shrink log
if diff and len(self.log) > start and self.log[start].term != diff[0].term:
self.log = self.log[:start]
if start+len(diff) > len(self.log):
self.log += diff[len(self.log)-start:]
if lCommitLen > self.commitLength:
for msg,_ in self.log[self.commitLength:lCommitLen]:
__deliver(msg)
self.commitLength = lCommitLen
def when_recv_LogResponse(self, fId, fTerm, ack, good):
if self.Term == fTerm and self.Role == "leader":
if good and ack >= self.ackedLength[fId]:
self.sentLength[fId] = self.ackedLength[Utils, CId] = ack
self.commitLogEntries()
elif self.sentLength[fId] > 0:
# 太長啦
self.sentLength[fId] -= 1
self.copyLogTo(fId)
elif fTerm > self.Term:
self.Term, self.Role, self.votedFor = fTerm, "follower", None
def commitLogEntries(self):
acks = lambda l: len(n for n in nodes if self.ackedLength[n] >= l)
newCommitLen = max({l for l in range(1,len(self.log+1)) if acks(l) >= (len(nodes)+1)//2}, default=-1)
if newCommitLen > self.commitLength and self.log[newCommitLen-1].term == self.Term:
for msg,_ in self.log[self.commitLength:newCommitLen]:
__deliver(msg)
self.commitLength = newCommitLen
Replica consistency
- 各種情境下的Consistency
- ACID
- DB在跑完transaction後會從consistent state到另一個consistent state
- consistent: satisfying application-specific invariants
- DB在跑完transaction後會從consistent state到另一個consistent state
- Read-after-write consistency
- Replication: 每個replica都要consistent
- 同樣狀態? 從什麼時候開始算
- read都要return一樣的結果
- ACID
- Atomic commit
- ACID的transaction是
- either commits or aborts
- commit: 是持久的(後面都看的到)
- abort: 沒有可見的side-effect
- 所以如果很多DB,也是either commits or aborts
- either commits or aborts
- Two-phase commit
- 如果process在等coordinator回commit或是abort之前掛了?
- 就是等coordinator回來
- 如果process在等coordinator回commit或是abort之前掛了?
- Fault-tolerant two-phase commit
- 就是傳commit時會帶所有有關的replica
- 這樣只要有人發現在有關的replica掛了就可以發abort
- total order broadcast
- 這樣只要有人發現在有關的replica掛了就可以發abort
- 之後就是等return ok
- 都ok,commit
- 出事,abort
- 就是傳commit時會帶所有有關的replica
- ACID的transaction是
- Linearizability (strong consistency)
- 多node的atomic operation
- 每個operation的return都是最新的結果
- not happens-before
- set之後的get都要能看到set的結果 (Linearizability)
- client1與client2沒有send/recv (not happens-before)
- 所以不能用Lamport clocks!!
- 只能用phy clock!!
- Operations overlapping in time
- order沒差,這裡的重點是看的到
- Serializability & Linearizability
- Linearizability: 都拿到最新的結果 (cache coherense)
- Serializability: 多個transaction同時跑就像是transaction按照某個順序去跑 (mem model)
- client2與client3拿到的結果不對 (not Linearizability)!!
- 手法
- get的linearizability
- quorum read
- set的linearizability
- blind write to quorum
- get的linearizability
- 手法
- Linearizable compare-and-swap (CAS)
- total order broadcast
- advantages
- 分散式不像分散式
- 使用上就變簡單了
- Downsides
- Performance: 很多msg與一直在等
- Scalability: 需要leader
- Availability: 連不到quorum什麼事都不用做
- 多node的atomic operation
- Eventual consistency
- The CAP theorem
- 在網路會gg的情況下 (network Partition),只能保證一個
- Consistent (linearizable)
- 等
- Available
- 不等,直接傳自己的舊資料
- Consistent (linearizable)
- 在網路會gg的情況下 (network Partition),只能保證一個
- 只要沒有進一步的update,所有replica都會變成一樣的state
- Strong eventual consistency
- Convergence: 只要是同一個state跑同一集合的update(order沒差),最後會是一樣的狀態
- Eventual delivery: 只要有人被update到,最後所有人都會被update到
- Properties
- 不用等
- 只要Causal broadcast或以下就可以update
- Concurrent updates => 只要能處理conflict就沒事
- The CAP theorem
Concurrency control in applications (152)
Conflicts due to concurrent updates
- 解法
Conflict-free Replicated Data Types (CRDTs)
- 就是有timestamp的dict
- 作法
- Operation-based
- 傳的是action (set)
- reliable broadcast (一定要到,但可以是任何順序)
- action(set)一定要commutative
- reliable broadcast (一定要到,但可以是任何順序)
- typically has smaller messages
- 例子: Operation-based text CRDT
- init & read
- elementAt就是
array[i]
,實作在set上
- elementAt就是
- 有一段區間,insert就是二分
- delete就是把tuple從state中去掉
- causal broadcast
- insert要在delete之前先到
- init & read
- 傳的是action (set)
- State-based
- 傳的是state (整個values)
- best-effort broadcast (不到沒關係)
- 原本reliable確保一定會到,但現在沒有
- Idempotent
- 剩下就是原本reliable的事
- Commutative
- 不過現在一次多個 (opration based是一次兩個)
- Associative
- 原本reliable確保一定會到,但現在沒有
- best-effort broadcast (不到沒關係)
- can tolerate message loss/duplication
- 傳的是state (整個values)
- Operation-based
- 就是有timestamp的dict
Operational Transformation (OT)
- 把operation記錄下來,之後需要重組可以重新組合
- 把operation記錄下來,之後需要重組可以重新組合
- 解法
Consistent snapshots
- 前面做的事(包含transcation),後面(包含transcation)看的到
- consistent with causality
- transcation都要consistent with causality
- linearizability depends on real-time order
- 把誤差補上
- linearizability depends on real-time order
- transcation都要consistent with causality
- consistent with causality
- 作法: multi-version concurrency control (MVCC)
- 每一次write都會產生新的版本與相對應的timestamp
- read-only transcation就是一個時間
- read時就是取比自己早且最靠近自己的資料
- 前面做的事(包含transcation),後面(包含transcation)看的到
snapshot of system-wide state
- Consistent cuts
- 區分event的cut
- 在cut中的event都是happens-before
- 除了起點
:-)
,所以是consistent
- 除了起點
- 在cut中的event都是happens-before
- 也就是,在左手邊的event只有兩種
- send
- recv,同時他的send也在cut(左手邊)中
- f只有recv,沒有send在cut中,就不是consistent
- 區分event的cut
- 收集snapshot of system-wide state
- 把Consistent cut推到的地方才收集local state
- local state之後可以集合起來變成global state
- 怎麼代表cut?
- 用marker msg以causal order去發
- 收到marker就收集state
- 但是當初收到marker的channel不用
- 因為前一個process已經收集過了
- 但是當初收到marker的channel不用
- 收到marker就收集state
- 用marker msg以causal order去發
- 把Consistent cut推到的地方才收集local state
- Consistent cuts
Ref
Linearizability versus Serializability Distributed Systems (好地方,如果slide不懂還有lecture note) Lecture 13: Vector clocks, consistent cuts, process groups, and distributed mutual exclusion