KAIST CS492: Design and Analysis of Concurrent Programs

#基本概念

#Progress

A program is making progress if, when the program threads are run for a sufficiently long time, at least one of the threads makes progress (for some sensible definition of progress).

#Make Progress

在多线程编程中,Make Progress 是指线程在全局时间上取得有用的进展。CAS 操作不算进展。

使用锁的问题是可能存在一段时间,程序所有线程都没有 Make Progress。例如,当一个线程拿到锁,然后被调度器切换到其他线程,此时,整个程序就没有任何 Progress。

#Lock-free

| An algorithm is lock-free if, when the program threads are run for a sufficiently long time, at least one of the threads makes progress (for some sensible definition of progress).

Lock-free 表示程序在任意时间内,至少有一个线程能够 Make Progress。

#Wait-free

| An algorithm is wait-free if every operation has a bound on the number of steps the algorithm will take before the operation completes.

Wait-free 表示程序在任意时间内,所有线程都能够 Make Progress。

关系: Wait-free ⊆ lock-free ⊆ obstruction-free ⊆ “nonblocking”

#Spinlocks

#Naive Spinlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/// A spin lock.
#[derive(Debug)]
pub struct SpinLock {
inner: AtomicBool,
}

unsafe impl RawLock for SpinLock {
type Token = ();

fn lock(&self) {
let backoff = Backoff::new();

while self
.inner
.compare_exchange(false, true, Acquire, Relaxed)
.is_err()
{
backoff.snooze();
}
}

unsafe fn unlock(&self, _token: ()) {
self.inner.store(false, Release);
}
}

unsafe impl RawTryLock for SpinLock {
fn try_lock(&self) -> Result<(), ()> {
self.inner
.compare_exchange(false, true, Acquire, Relaxed)
.map(|_| ())
.map_err(|_| ())
}
}

#Ticket Lock

Ticket Lock 是一种自旋锁,它使用一个全局的计数器来实现锁。每个线程在进入临界区之前,需要先获取一个自增的 ticket,然后 spin 等待另一个计数器达到自己的 ticket 值。

加锁操作:

  1. 线程获取一个自增的 ticket 值。
  2. 线程等待另一个计数器达到自己的 ticket 值。
  3. 线程进入临界区。

解锁操作:

  1. 线程将另一个计数器加 1。
  2. 线程退出临界区。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct TicketLock {
next_ticket: AtomicUsize,
now_serving: AtomicUsize,
}

impl TicketLock {
pub fn new() -> Self {
Self {
next_ticket: AtomicUsize::new(0),
now_serving: AtomicUsize::new(0),
}
}

pub fn lock(&self) {
let ticket = self.next_ticket.fetch_add(1, Relaxed);
while self.now_serving.load(Relaxed) != ticket {
std::hint::spin_loop();
}
}

pub fn unlock(&self) {
let next = self.now_serving.load(Relaxed);
self.now_serving.store(next + 1, Release);
}
}

#MCS Lock

Ticket Lock 有一个问题,就是多个 CPU 同时争抢一把锁时,会导致严重的总线竞争,导致性能下降。

MCS Lock (Mellor-Crummey and Scott locks)[1] 通过使用一个链表来实现锁,每个线程在进入临界区之前,需要先获取一个自增的 ticket,然后 spin 等待另一个线程的锁。

数据结构:

  • MCS 锁实例 ​​: 一条单向链表的指针,初始化为指向一个未拥有的节点。
  • 锁节点 ​​: 每次加锁时创建一个节点,包含 next 指针和 locked 状态。
    • next 指针指向下一个正在等待锁的节点。
    • locked: volitile bool 状态表示当前线程是否正在持有锁。

加锁操作:

  1. 当前线程创建锁节点,next 设为 nullptr。
  2. 通过 lock.getAndSet()将自己加入队列尾部,并获取前驱节点 pred。
  3. 如果 pred 为空,则说明没有线程在排队,直接结束。
  4. 将当前锁节点的 locked 设为 true。
  5. 将 pred->next 指针指向当前节点。
  6. 自旋等待 locked 变成 false。

解锁操作:

  1. 获取当前线程的锁节点 I。
  2. 如果 I->next 为空,则说明没有线程在排队。
    1. 通过 lock.cas(I, nullptr)将队列清空
    2. 如果成功,则直接结束;否则,说明有后继节点过来了
    3. Spin 等待后继线程把 I->next 设置好
  3. 将 I->next->locked 设为 false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type qnode = record
next : ^qnode // 指向下一个正在等待锁的节点
locked : Boolean // 当前线程是否正在持有锁
type lock = ^qnode // initialized to nil

// parameter I, below, points to a qnode record allocated
// (in an enclosing scope) in shared memory locally-accessible
// to the invoking processor

procedure acquire_lock (L : ^lock, I : ^qnode)
I->next := nil
pred : ^qnode := fetch_and_store (L, I) // 节点入队
if pred != nil // 如果之前有线程正在持有锁,说明需要等待
I->locked := true
pred->next := I // 将当前节点加入到前驱节点的next指针
repeat while I->locked // 自旋等待前驱节点释放锁

procedure release_lock (L : ^lock, I: ^qnode)
if I->next = nil // 没有后继节点在排队
if compare_and_store (L, I, nil) // 确实没有后继节点在排队,则结束
return
repeat while I->next = nil // 等待后继节点设置好next指针
I->next->locked := false // 后继节点可以获得锁了

#CLH Lock

CLH Lock (Craig, Landin, and Hagersten locks)[2] 是一种基于 ​​ 单向链表实现的可扩展、高性能、公平的自旋锁,它使用一个链表来实现锁。加锁过程相当于将当前线程加入到队列尾部,并等待前驱节点释放锁。解锁过程相当于将当前节点从队列中移除。

数据结构:

  • CLH 锁实例 ​​: 一条单向链表的指针,初始化为指向一个未拥有的节点。
  • 锁节点 ​​: 每次加锁时创建一个节点,包含 prev 指针和 succ_must_wait 状态。
    • prev 指针指向前一个线程的锁节点。
    • succ_must_wait: volitile bool 状态表示当前线程是否正在持有锁。

加锁操作:

  1. 当前线程创建锁节点,succ_must_wait 设为 true。
  2. 通过 lock.getAndSet()将自己加入队列尾部,并获取前驱节点。
  3. 在前驱节点的 succ_must_wait 上自旋,直到其变为 false。

解锁操作:

  1. 获取前驱节点。
  2. 将当前锁节点的 succ_must_wait 设为 false。
  3. 返回前一个锁节点,用于下一次加锁使用 (前一个锁节点一定不再被使用,这里复用前一个锁节点以避免内存分配和释放)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
type qnode = record
prev : ^qnode // 前一个正在等待锁/持有锁的节点
succ_must_wait : Boolean // 当前线程是否正在持有锁
type lock = ^qnode // initialized to point to an unowned qnode

procedure acquire_lock (L : ^lock, I : ^qnode)
I->succ_must_wait := true
pred : ^qnode := I->prev := fetch_and_store (L, I) // 节点入队
repeat while pred->succ_must_wait

procedure release_lock (ref I : ^qnode)
pred : ^qnode := I->prev
I->succ_must_wait := false
I := pred // 返回前一个锁节点,用于下一次加锁使用。

CLH 锁的优缺点:

  • 空间复杂度低,N 个线程,L 个锁,每个线程获取一个锁,则存储空间为O(N+L)
  • NUMA 架构的 CPU 上,性能差。每个 CPU 都有自己的内存,如果前驱节点不在同一个 CPU 内存上,由于内存位置比较远,在自旋判断前驱节点的 locked 属性时,性能很差。不过在 SMP 架构的 CPU 上,不存在这个问题,性能依旧很高。

Java 的 AbstractQueuedSynchronizer(简称为 AQS)就是基于 CLH Lock 思想实现的。

#K42 Lock

K42 Lock 是一种 MCS Lock 的变种,由 IBM K42 Group 提出,避免了加锁时需要多传一个参数的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Locks and queue nodes use the same data structure:
type lnode = record
next : ^lnode
union
locked : Boolean // for queue nodes
tail : ^lnode // for locks
type lock = lnode

// If threads are waiting for a held lock, next points to the queue node
// of the first of them, and tail to the queue node of the last.
// A held lock with no waiting threads has value <&head, nil>.
// A free lock with no waiting threads has value <nil, nil>.

procedure acquire_lock (L : ^lnode)
I : lnode
loop
predecessor : ^lnode := L->tail
if predecessor = nil
// lock appears not to be held
if compare_and_store (&L->tail, nil, &L->next)
// I have the lock
return
else
// lock appears to be held
I.next := nil
if compare_and_store (&L->tail, predecessor, &I)
// I'm in line
I.locked := true
predecessor->next := &I
repeat while I.locked // wait for lock

// I now have the lock
successor : ^lnode := I.next
if successor = nil
L->next := nil
if ! compare_and_store (&L->tail, &I, &L->next)
// somebody got into the timing window
repeat
successor := I.next
while successor = nil // wait for successor
L->next := successor
return
else
L->next := successor
return

procedure release_lock (L : ^lnode)
successor : ^lnode := L->next
if successor = nil // no known successor
if compare_and_store (&L->tail, &L->next, nil)
return
repeat
successor := L->next
while successor = nil // wait for successor
successor->locked := false

#HCLH Lock

A Hierarchical CLH Queue Lock

#MCS Parking Lock

#总结

  • Non-scalable locks are dangerous.
  • 各种 Scalable Spinlock 的总结伪代码: https://www.cs.rochester.edu/research/synchronization/pseudocode/ss.html

#Lock-based Concurrency

#Sequence Lock

#Lock-free Data Structures

#Treiber Stack

Treiber Stack 是一种无锁栈实现,它通过使用一个链表来实现栈。

Push 操作:

  1. 创建一个新的节点 n,并将其数据初始化。
  2. top = load(top)
  3. n->next = top
  4. CAS(top, top, n)
  5. 如果失败,则回到第 2 步重试。

Pop 操作:

  1. 如果栈为空则返回 nullptr。
  2. top = load(top)
  3. next = top->next
  4. CAS(top, top, next)
  5. 如果 CAS 成功,则返回 top,否则回到第 2 步重试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
impl<T> Stack<T> {
/// Pushes a value on top of the stack.
pub fn push(&self, t: T) {
let mut node = Owned::new(Node {
data: MaybeUninit::new(t),
next: ptr::null(),
});

// SAFETY: We don't dereference any pointers obtained from this guard.
let guard = unsafe { crossbeam_epoch::unprotected() };

let mut head = self.head.load(Relaxed, guard);
loop {
node.next = head.as_raw();

match self
.head
.compare_exchange(head, node, Release, Relaxed, guard)
{
Ok(_) => break,
Err(e) => {
head = e.current;
node = e.new;
}
}
}
}

/// Attempts to pop the top element from the stack.
///
/// Returns `None` if the stack is empty.
pub fn pop(&self) -> Option<T> {
let mut guard = crossbeam_epoch::pin();

loop {
let head = self.head.load(Acquire, &guard);
let h = unsafe { head.as_ref() }?;
let next = Shared::from(h.next);

if self
.head
.compare_exchange(head, next, Relaxed, Relaxed, &guard)
.is_ok()
{
// Since the above `compare_exchange()` succeeded, `head` is detached from
// `self` so is unreachable from other threads.

// SAFETY: We are returning ownership of `data` in `head` by making a copy of it via
// `assume_init_read()`. This is safe as no other thread has access to `data` after
// `head` is unreachable, so the ownership of `data` in `head` will never be used
// again.
let result = unsafe { h.data.assume_init_read() };

// SAFETY: `head` is unreachable, and we no longer access `head`.
unsafe { guard.defer_destroy(head) };

return Some(result);
}

// Repin to ensure the global epoch can make progress.
guard.repin();
}
}

}

#Michael-Scott’s Queue

Michael-Scott’s queue 是一种无锁队列实现,它通过使用一个链表来实现队列。

Enqueue 操作:

  1. 创建一个新的节点 n,并将其数据初始化。
  2. tail = load(tail)
  3. n->next = tail
  4. CAS(tail, tail, n)
  5. 如果失败,则回到第 2 步重试。

Dequeue 操作:和 Treiber Stack 的 Pop 操作一样

#Harris Sorted List


  1. Mellor-Crummey, M. and Scott, M.L. 1981. Algorithms for Scalable Synchronization on Shared-Memory Multiprocessors. ACM TOCS, February 1991. ↩︎

  2. Discovered independently by Travis Craig at the University of Washington (UW TR 93-02-02, February 1993), and by Anders Landin and Eric Hagersten of the Swedish Institute of Computer Science (IPPS, 1994). ↩︎