无锁队列

CAS原子操作

CAS英文全称Compare & Set(Swap),也就是原子操作,执行过程中不能被中断,在X86中有相应的汇编指令CMPCHG来实现。
CAS机制当中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。
更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。

例如:

1
int val = 10; // V = 0x423D80
  • 线程1想要把val的值加一。在当前状态下,1得到的val = 10,期望值为11。即A = 10, B = 11;
  • 此时线程2切换时间片,将val更新为11;
  • 线程1开始提交更新,首先进行比较,A = 10 != val = 11,提交失败,此时线程1重新获取内存地址V的当前值,A = 11, B = 12,并且重新开始比较。
  • 如果相等,那么就把地址V的值替换为B = 12。

从思想上看,CAS属于乐观锁。

在c中的实现如下:

1
2
3
4
5
6
7
8
int compare_and_swap (int* reg, int oldval, int newval)
{
int old_reg_val = *reg;
if (old_reg_val == oldval) {
*reg = newval;
}
return old_reg_val;
}

在c++也可以实现为返回bool类型

1
2
3
4
5
6
7
8
bool compare_and_swap (int *addr, int oldval, int newval)
{
if ( *addr != oldval ) {
return false;
}
*addr = newval;
return true;
}

CAS的缺点主要有:

  1. CPU开销较大
    在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量,却又一直更新不成功,循环往复,会给CPU带来很大的压力。
  2. 不能保证代码块的原子性
    CAS机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性。比如需要保证3个变量共同进行原子性的更新,就不得不使用Synchronized了。
  3. ABA问题
    如果另一个线程修改V值假设原来是A,先修改成B,再修改回成A。当前线程的CAS操作无法分辨当前V值是否发生过变化。

无锁队列

  1. 初始化队列
    1
    2
    3
    4
    5
    6
    InitQueue(Q)
    {
    node = new node()
    node->next = NULL;
    Q->head = Q->tail = node;
    }
  2. 入队使用CAS实现
1
2
3
4
5
6
7
8
9
10
11
12
EnQueue(Q, data) //进队列
{
//准备新加入的结点数据
n = new node();
n->value = data;
n->next = NULL;
do {
p = Q->tail; //取链表尾指针的快照
} while( CAS(p->next, NULL, n) != TRUE);
//while条件注释:如果没有把结点链在尾指针上,再试
CAS(Q->tail, p, n); //置尾结点 tail = n;
}

我们可以看到,程序中的那个 do-while 的 Retry-Loop 中的 CAS 操作:如果 p->next 是 NULL,那么,把新结点 n 加到队尾。如果不成功,则重新再来一次!

就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。

但是你会看到,为什么我们的“置尾结点”的操作(第13行)不判断是否成功,因为:

  1. 如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
  2. 此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
  3. 直到T1线程更新完 tail 指针,于是其它的线程中的某个线程就可以得到新的 tail 指针,继续往下走了。
  4. 所以,只要线程能从 while 循环中退出来,意味着,它已经“独占”了,tail 指针必然可以被更新。
    这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    EnQueue(Q, data) //进队列改良版 v1
    {
    n = new node();
    n->value = data;
    n->next = NULL;
    p = Q->tail;
    oldp = p
    do {
    while (p->next != NULL)
    p = p->next;
    } while( CAS(p.next, NULL, n) != TRUE); //如果没有把结点链在尾上,再试
    CAS(Q->tail, oldp, n); //置尾结点
    }

我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而且,如果一个线程不断的EnQueue,会导致所有的其它线程都去 fetch 他们的 p 指针到队尾,能不能不要所有的线程都干同一个事?这样可以节省整体的时间?

比如:直接 fetch Q->tail 到队尾?因为,所有的线程都共享着 Q->tail,所以,一旦有人动了它后,相当于其它的线程也跟着动了,于是,我们的代码可以改进成如下的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
EnQueue(Q, data) //进队列改良版 v2 
{
n = new node();
n->value = data;
n->next = NULL;

while(TRUE) {
//先取一下尾指针和尾指针的next
tail = Q->tail;
next = tail->next;

//如果尾指针已经被移动了,则重新开始
if ( tail != Q->tail ) continue;

//如果尾指针的 next 不为NULL,则 fetch 全局尾指针到next
if ( next != NULL ) {
CAS(Q->tail, tail, next);
continue;
}

//如果加入结点成功,则退出
if ( CAS(tail->next, next, n) == TRUE ) break;
}
CAS(Q->tail, tail, n); //置尾结点
}

好了,我们解决了EnQueue,我们再来看看DeQueue的代码:

1
2
3
4
5
6
7
8
9
10
DeQueue(Q) //出队列
{
do{
p = Q->head;
if (p->next == NULL){
return ERR_EMPTY_QUEUE;
}
while( CAS(Q->head, p, p->next) != TRUE );
return p->next->value;
}

我们可以看到,DeQueue的代码操作的是 head->next,而不是 head 本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个元素,head 和 tail 都指向同一个结点的问题,这样 EnQueue 和 DeQueue 要互相排斥了。

但是,如果 head 和 tail 都指向同一个结点,这意味着队列为空,应该返回 ERR_EMPTY_QUEUE,但是,在判断 p->next == NULL 时,另外一个EnQueue操作做了一半,此时的 p->next 不为 NULL了,但是 tail 指针还差最后一步,没有更新到新加的结点,这个时候就会出现,在 EnQueue 并没有完成的时候, DeQueue 已经把新增加的结点给取走了,此时,队列为空,但是,head 与 tail 并没有指向同一个结点。
虽然,EnQueue的函数会把 tail 指针置对,但是,这种情况可能还是会导致一些并发问题,所以,严谨来说,我们需要避免这种情况。于是,我们需要加入更多的判断条件,还确保这个问题。下面是相关的改进代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
DeQueue(Q) //出队列,改进版
{
while(TRUE) {
//取出头指针,尾指针,和第一个元素的指针
head = Q->head;
tail = Q->tail;
next = head->next;

// Q->head 指针已移动,重新取 head指针
if ( head != Q->head ) continue;

// 如果是空队列
if ( head == tail && next == NULL ) {
return ERR_EMPTY_QUEUE;
}

//如果 tail 指针落后了
if ( head == tail && next == NULL ) {
CAS(Q->tail, tail, next);
continue;
}

//移动 head 指针成功后,取出数据
if ( CAS( Q->head, head, next) == TRUE){
value = next->value;
break;
}
}
free(head); //释放老的dummy结点
return value;
}