CAS原子操作
CAS英文全称Compare & Set(Swap),也就是原子操作,执行过程中不能被中断,在X86中有相应的汇编指令CMPCHG来实现。
CAS机制当中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。
更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。
例如:
1 | int val = 10; // V = 0x423D80 |
- 线程1想要把val的值加一。在当前状态下,1得到的val = 10,期望值为11。即A = 10, B = 11;
- 此时线程2切换时间片,将val更新为11;
- 线程1开始提交更新,首先进行比较,A = 10 != val = 11,提交失败,此时线程1重新获取内存地址V的当前值,A = 11, B = 12,并且重新开始比较。
- 如果相等,那么就把地址V的值替换为B = 12。
从思想上看,CAS属于乐观锁。
在c中的实现如下:
1 | int compare_and_swap (int* reg, int oldval, int newval) |
在c++也可以实现为返回bool类型
1 | bool compare_and_swap (int *addr, int oldval, int newval) |
CAS的缺点主要有:
- CPU开销较大
在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量,却又一直更新不成功,循环往复,会给CPU带来很大的压力。 - 不能保证代码块的原子性
CAS机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性。比如需要保证3个变量共同进行原子性的更新,就不得不使用Synchronized了。 - ABA问题
如果另一个线程修改V值假设原来是A,先修改成B,再修改回成A。当前线程的CAS操作无法分辨当前V值是否发生过变化。
无锁队列
- 初始化队列
1
2
3
4
5
6InitQueue(Q)
{
node = new node()
node->next = NULL;
Q->head = Q->tail = node;
} - 入队使用CAS实现
1 | EnQueue(Q, data) //进队列 |
我们可以看到,程序中的那个 do-while 的 Retry-Loop 中的 CAS 操作:如果 p->next 是 NULL,那么,把新结点 n 加到队尾。如果不成功,则重新再来一次!
就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。
但是你会看到,为什么我们的“置尾结点”的操作(第13行)不判断是否成功,因为:
- 如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
- 此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
- 直到T1线程更新完 tail 指针,于是其它的线程中的某个线程就可以得到新的 tail 指针,继续往下走了。
- 所以,只要线程能从 while 循环中退出来,意味着,它已经“独占”了,tail 指针必然可以被更新。
这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()1
2
3
4
5
6
7
8
9
10
11
12
13EnQueue(Q, data) //进队列改良版 v1
{
n = new node();
n->value = data;
n->next = NULL;
p = Q->tail;
oldp = p
do {
while (p->next != NULL)
p = p->next;
} while( CAS(p.next, NULL, n) != TRUE); //如果没有把结点链在尾上,再试
CAS(Q->tail, oldp, n); //置尾结点
}
我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而且,如果一个线程不断的EnQueue,会导致所有的其它线程都去 fetch 他们的 p 指针到队尾,能不能不要所有的线程都干同一个事?这样可以节省整体的时间?
比如:直接 fetch Q->tail 到队尾?因为,所有的线程都共享着 Q->tail,所以,一旦有人动了它后,相当于其它的线程也跟着动了,于是,我们的代码可以改进成如下的实现:
1 | EnQueue(Q, data) //进队列改良版 v2 |
好了,我们解决了EnQueue,我们再来看看DeQueue的代码:
1 | DeQueue(Q) //出队列 |
我们可以看到,DeQueue的代码操作的是 head->next,而不是 head 本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个元素,head 和 tail 都指向同一个结点的问题,这样 EnQueue 和 DeQueue 要互相排斥了。
但是,如果 head 和 tail 都指向同一个结点,这意味着队列为空,应该返回 ERR_EMPTY_QUEUE,但是,在判断 p->next == NULL 时,另外一个EnQueue操作做了一半,此时的 p->next 不为 NULL了,但是 tail 指针还差最后一步,没有更新到新加的结点,这个时候就会出现,在 EnQueue 并没有完成的时候, DeQueue 已经把新增加的结点给取走了,此时,队列为空,但是,head 与 tail 并没有指向同一个结点。
虽然,EnQueue的函数会把 tail 指针置对,但是,这种情况可能还是会导致一些并发问题,所以,严谨来说,我们需要避免这种情况。于是,我们需要加入更多的判断条件,还确保这个问题。下面是相关的改进代码:
1 | DeQueue(Q) //出队列,改进版 |