動機

從一個cond var的bug開始

之前有一個程式是會開兩個thread之後其中一條會卡死,所以利用cond var通知main thread去kill另一條

auto f = []() {
  // something...
  cond_notify();
};

lk.lock()
create_thread()
create_thread()
cond_wait(&lk)

之後就有lost wakeup signal,其實很簡單,因為thread結束的時間可能比到達wait還快 解法就是把lock上回去,利用lock確保,notify一定會被看到

auto f = []() {
  // something...
  lk.lock();
  cond_notify();
  lk.unlock();
};

lk.lock()
create_thread()
create_thread()
cond_wait(&lk)

虛假喚醒

上面看起來是ok了,但是cond其實可能在任何時刻喚醒,所以有可以能喚醒了但是thread根本沒做事 所以要加條件

auto f = []() {
  // something...
  lk.lock();
  flag = true;
  cond_notify();
  lk.unlock();
};

lk.lock()
create_thread()
create_thread()
while (!flag)
  cond_wait(&lk)

xv6的sleep

xv6的wakeup需要吃一個鎖,裡面的理由與上面的一樣,為了看到對的資料(atomic)

下面是xv6中的sleep lock,這裡有個問題在s->count == 0為true時還沒到sleep時如果有人做wakeup就會lost wakeupsignal

void
V(struct semaphore *s)
{
  acquire(&s->lock);
  s->count += 1;
  wakeup(s);
  release(&s->lock);
}

void
P(struct semaphore *s)
{
  while(s->count == 0)
    sleep(s);
  acquire(&s->lock);
  s->count -= 1;
  release(&s->lock);
}

但這裡不能直接把P的lock往前拉,這樣就直接deadlock P做了sleep之後想要V就會出事

void
V(struct semaphore *s)
{
  acquire(&s->lock);
  s->count += 1;
  wakeup(s);
  release(&s->lock);
}

void
P(struct semaphore *s)
{
  acquire(&s->lock);
  while(s->count == 0)
    sleep(s);
  s->count -= 1;
  release(&s->lock);
}

所以在sleep的同時必須同時解鎖 這個與pthread的cond var一樣,同時也是分布式鎖要注意的地方,上鎖與設定timeout要同時

void
V(struct semaphore *s)
{
  acquire(&s->lock);
  s->count += 1;
  wakeup(s);
  release(&s->lock);
}

void
P(struct semaphore *s)
{
  acquire(&s->lock);
  while(s->count == 0)
    sleep(s, &s->lock);
  s->count -= 1;
  release(&s->lock);
}

如果不是同一個鎖? 4-way deadlock

之前寫raft遇到的,直接借助教的code

func (a *App) RPC(args interface{}, reply interface{}) {
    // ...
    a.mutex.Lock()
    i := a.raft.Start(args)
    // update some data structure so that apply knows to poke us later
    a.mutex.Unlock()
    // wait for apply to poke us
    return
}
func (r *Raft) Start(cmd interface{}) int {
    r.mutex.Lock()
    // do things to start agreement on this new command
    // store index in the log where cmd was placed
    r.mutex.Unlock()
    return index
}
func (a *App) apply(index int, cmd interface{}) {
    a.mutex.Lock()
    switch cmd := cmd.(type) {
    case GetArgs:
        // do the get
    // see who was listening for this index
    // poke them all with the result of the operation
    // ...
    }
    a.mutex.Unlock()
}
func (r *Raft) AppendEntries(...) {
    // ...
    r.mutex.Lock()
    // ...
    for r.lastApplied < r.commitIndex {
      r.lastApplied++
      r.app.apply(r.lastApplied, r.log[r.lastApplied])
    }
    // ...
    r.mutex.Unlock()
}
  1. 在AppendEntries中,想做apply
  2. APP透過RPC做Start

這樣就deadlock

  1. AppendEntries拿著raft的鎖,想要app的鎖
  2. RPC拿著app的鎖,想要raft的鎖(Start)

我之前的raft是直接透過channel做apply,但是tester的channel是unbuffered!! 所以滿了會block

tester想start,raft想塞,就deadlock 最後直接多一個buffered channel放要commit的訊息,讓另一個thread去commit