A queue
is a very common data structure that allows only outgoing (dequeue
) operations at the front end of a table (head
) and incoming (enqueue
) operations at the back end of a table (tail
). Like the stack data structure, a queue is a linear table with restricted operations. The end that performs the insert operation is called the tail
and the end that performs the delete operation is called the header
.
In a concurrent environment using queues, it is necessary to take into account the multi-threaded (multi-threaded) concurrent read and write problems, there may be multiple write (queue) operation threads, while there may also be multiple threads read operation threads, in this case, we want to ensure that the data is not lost, not duplicated, but also to ensure that the function of the queue remains unchanged, that is, the first-in-first-out logic, as long as there is data, you can get out of the column.
Admittedly, concurrent access to the queue can be achieved through an out-of-exclusion lock. Generally, the queue is implemented through pointers and only operates at the head and tail of the queue, so the critical area protected by this out-of-exclusion lock does not have a very complex execution logic and the critical area is processed quickly, so in general the efficiency of the queue is already very high by implementing the out-of-exclusion lock. However, in some cases, by implementing lock-free algorithm, we can further improve the performance of concurrent queues.
This article introduces some background knowledge of the lock-free queue
algorithm, and implements three concurrent queues and provides the results of performance tests.
The code base can be found on github: smallnest/queue
lock-free queue algorithm
Speaking of lock-free queue algorithms, we have to mention Maged M. Michael and Michael L. Scott’s 1996 paper Simple, Fast, and Practical Non-Blocking and Blocking
Concurrent Queue Algorithms, which reviews some implementations of concurrent queues and their limitations, proposes a very simple implementation of lock-free queue
, and also provides a two-lock queue
algorithm on specific machines such as those without CAS instructions. This article has been cited nearly 1000 times.
It is only worth mentioning that Java’s ConcurrentLinkedQueue
is based on this algorithm:
This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.
Most of the lock-free
algorithms are implemented through CAS
operations.
This article provides a pseudo-code for the lock-free queue
algorithm, which is also very small, so it can be easily implemented by various programming languages. I have listed the pseudo-code here:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
structure pointer_t {ptr: pointer to node_t, count: unsigned integer}
structure node_t {value: data type, next: pointer_t}
structure queue_t {Head: pointer_t, Tail: pointer_t}
initialize(Q: pointer to queue_t)
node = new_node() // Allocate a free node
node->next.ptr = NULL // Make it the only node in the linked list
Q->Head.ptr = Q->Tail.ptr = node // Both Head and Tail point to it
enqueue(Q: pointer to queue_t, value: data type)
E1: node = new_node() // Allocate a new node from the free list
E2: node->value = value // Copy enqueued value into node
E3: node->next.ptr = NULL // Set next pointer of node to NULL
E4: loop // Keep trying until Enqueue is done
E5: tail = Q->Tail // Read Tail.ptr and Tail.count together
E6: next = tail.ptr->next // Read next ptr and count fields together
E7: if tail == Q->Tail // Are tail and next consistent?
// Was Tail pointing to the last node?
E8: if next.ptr == NULL
// Try to link node at the end of the linked list
E9: if CAS(&tail.ptr->next, next, <node, next.count+1>)
E10: break // Enqueue is done. Exit loop
E11: endif
E12: else // Tail was not pointing to the last node
// Try to swing Tail to the next node
E13: CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
E14: endif
E15: endif
E16: endloop
// Enqueue is done. Try to swing Tail to the inserted node
E17: CAS(&Q->Tail, tail, <node, tail.count+1>)
dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean
D1: loop // Keep trying until Dequeue is done
D2: head = Q->Head // Read Head
D3: tail = Q->Tail // Read Tail
D4: next = head.ptr->next // Read Head.ptr->next
D5: if head == Q->Head // Are head, tail, and next consistent?
D6: if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7: if next.ptr == NULL // Is queue empty?
D8: return FALSE // Queue is empty, couldn't dequeue
D9: endif
// Tail is falling behind. Try to advance it
D10: CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
D11: else // No need to deal with Tail
// Read value before CAS
// Otherwise, another dequeue might free the next node
D12: *pvalue = next.ptr->value
// Try to swing Head to the next node
D13: if CAS(&Q->Head, head, <next.ptr, head.count+1>)
D14: break // Dequeue is done. Exit loop
D15: endif
D16: endif
D17: endif
D18: endloop
D19: free(head.ptr) // It is safe now to free the old node
D20: return TRUE // Queue was not empty, dequeue succeeded
|
initialize Initialize a queue, and use an auxiliary empty node to do the header, to facilitate the processing of incoming and outgoing queues.
In the incoming pair, E1~E3 first create a new node, and save the incoming data on this node, the next step is to insert to the end of the queue.
E4~E16 is a loop that keeps trying to insert the data into the queue, in the case of concurrency CAS may not be successful, so Hu keeps trying, there will always be one of the concurrent threads that is successful, so it is a lock-free algorithm.
E5~E6 is to get the tail pointer and the next node pointed by the tail pointer. If there is no concurrency, the next node pointed by the tail pointer here is empty if there is no concurrency. But if in the case of concurrency, at the time of line E7 another thread may have joined the new node, or the previous tail node is out of the pair, so the implementation in E7 first makes a judgment and re-fetches if it is not satisfied.
In the case that the condition of E8 is satisfied, it means that the currently acquired tail pointer is still the tail pointer, then in line E9 the node is added to the queue by CAS and the loop is jumped out, but the tail pointer has not changed at this time.
Otherwise, a new node may have been added to the queue in the process, then in line E12, try to move the tail pointer backward to point to the new node.
At the end of the loop, which is definitely already in the queue, try to point the tail pointer to the newly inserted node. Of course a new node may have joined at this time, causing CAS to be unsuccessful, but it doesn’t matter, because the node has already joined the queue, except that it is no longer the tail node. The logic to update the joined node will move the tail node to the last newly joined node.
At the time of queuing out, D2~D4 get the head pointer and tail pointer, and D5 marks a step in processing with the head pointer unchanged, indicating that there are no other queuing out operations at this time.
D6~D10 is the same node pointed by the tail pointer and the head pointer. There are two cases: 1 is an empty queue, then directly return false, because there is no data out of the column, 2 is a new entry of data, has not had time to adjust the tail pointer, then this time to move the tail pointer. Then try again.
Otherwise, D12 first get the first data, first save the data, then try to move the head pointer to this node. Return this data and null the current node data of the head pointer, because the head pointer is an auxiliary node and does not need to save data.
Realization
1. lock-free queue
According to the pseudo-code in the paper, we can implement a lock-free queue using Go. here the pointer we use unsafe.Pointer
to implement it, which is convenient for CAS
operations.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package queue
import (
"sync/atomic"
"unsafe"
)
// LKQueue is a lock-free unbounded queue.
type LKQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
type node struct {
value interface{}
next unsafe.Pointer
}
// NewLKQueue returns an empty queue.
func NewLKQueue() *LKQueue {
n := unsafe.Pointer(&node{})
return &LKQueue{head: n, tail: n}
}
// Enqueue puts the given value v at the tail of the queue.
func (q *LKQueue) Enqueue(v interface{}) {
n := &node{value: v}
for {
tail := load(&q.tail)
next := load(&tail.next)
if tail == load(&q.tail) { // are tail and next consistent?
if next == nil {
if cas(&tail.next, next, n) {
cas(&q.tail, tail, n) // Enqueue is done. try to swing tail to the inserted node
return
}
} else { // tail was not pointing to the last node
// try to swing Tail to the next node
cas(&q.tail, tail, next)
}
}
}
}
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q *LKQueue) Dequeue() interface{} {
for {
head := load(&q.head)
tail := load(&q.tail)
next := load(&head.next)
if head == load(&q.head) { // are head, tail, and next consistent?
if head == tail { // is queue empty or tail falling behind?
if next == nil { // is queue empty?
return nil
}
// tail is falling behind. try to advance it
cas(&q.tail, tail, next)
} else {
// read value before CAS otherwise another dequeue might free the next node
v := next.value
if cas(&q.head, head, next) {
return v // Dequeue is done. return
}
}
}
}
}
func load(p *unsafe.Pointer) (n *node) {
return (*node)(atomic.LoadPointer(p))
}
func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
return atomic.CompareAndSwapPointer(
p, unsafe.Pointer(old), unsafe.Pointer(new))
}
|
2. two-lock queue
The lock-free queue above implements an efficient concurrent queue via CAS, while this paper also implements a two-lock algorithm that can be applied to multiprocessors without atomic operations.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package queue
import (
"sync"
)
// CQueue is a concurrent unbounded queue which uses two-Lock concurrent queue qlgorithm.
type CQueue struct {
head *cnode
tail *cnode
hlock sync.Mutex
tlock sync.Mutex
}
type cnode struct {
value interface{}
next *cnode
}
// NewCQueue returns an empty CQueue.
func NewCQueue() *CQueue {
n := &cnode{}
return &CQueue{head: n, tail: n}
}
// Enqueue puts the given value v at the tail of the queue.
func (q *CQueue) Enqueue(v interface{}) {
n := &cnode{value: v}
q.tlock.Lock()
q.tail.next = n // Link node at the end of the linked list
q.tail = n // Swing Tail to node
q.tlock.Unlock()
}
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q *CQueue) Dequeue() interface{} {
q.hlock.Lock()
n := q.head
newHead := n.next
if newHead == nil {
q.hlock.Unlock()
return nil
}
v := newHead.value
newHead.value = nil
q.head = newHead
q.hlock.Unlock()
return v
}
|
3. mutex-based queue
Traditionally, we can implement a queue
composed of a mutex + slice
, and implement a simple queue
without excessive pursuit of performance (time + space).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
package queue
import "sync"
// SliceQueue is an unbounded queue which uses a slice as underlying.
type SliceQueue struct {
data []interface{}
mu sync.Mutex
}
// NewSliceQueue returns an empty queue.
// You can give a initial capacity.
func NewSliceQueue(n int) (q *SliceQueue) {
return &SliceQueue{data: make([]interface{}, 0,n)}
}
// Enqueue puts the given value v at the tail of the queue.
func (q *SliceQueue) Enqueue(v interface{}) {
q.mu.Lock()
q.data = append(q.data, v)
q.mu.Unlock()
}
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q *SliceQueue) Dequeue() interface{} {
q.mu.Lock()
if len(q.data) == 0 {
q.mu.Unlock()
return nil
}
v := q.data[0]
q.data = q.data[1:]
q.mu.Unlock()
return v
}
|
1
2
3
4
5
6
7
8
9
10
11
12
|
goos: darwin
goarch: amd64
pkg: github.com/smallnest/queue
BenchmarkQueue/lock-free_queue#4-4 8399941 177 ns/op
BenchmarkQueue/two-lock_queue#4-4 7544263 155 ns/op
BenchmarkQueue/slice-based_queue#4-4 6436875 194 ns/op
BenchmarkQueue/lock-free_queue#32-4 8399769 140 ns/op
BenchmarkQueue/two-lock_queue#32-4 7486357 155 ns/op
BenchmarkQueue/slice-based_queue#32-4 4572828 235 ns/op
BenchmarkQueue/lock-free_queue#1024-4 8418556 140 ns/op
BenchmarkQueue/two-lock_queue#1024-4 7888488 155 ns/op
BenchmarkQueue/slice-based_queue#1024-4 8902573 218 ns/op
|
Reference https://colobu.com/2020/08/14/lock-free-queue-in-go/