19 分钟
聊聊并发系列
参考:并发编程网——聊聊并发系列 《Java并发编程实战》
〇、并发技巧清单
1、基础知识技巧
- 可变状态是至关重要的——状态越少,并发越容易
- 尽量声明域为final
- 不可变类型一定是线程安全的
- 封闭有助于管理复杂性
- 用锁来保每个可变对象
- 当保护同一个不可变条件中的所有条件要使用同一个锁
- 在执行复合操作时要持有锁
- 如果从多个线程中访问同一个可变变量,没有同步机制,程序会出现问题
- 不要自作聪明的推断出来不用使用同步
- 在设计构成中要考虑线程安全,或者在文档中明确指明出他不是线程安全的。
- 将同步策略文档化
2、结构化并发应用程序
一、深入分析Volatile的实现原理
1、
(1)概述
Volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的“可见性”。可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。它在某些情况下比synchronized的开销更小,本文将深入分析在硬件层面上Inter处理器是如何实现Volatile的,通过深入分析能帮助我们正确的使用Volatile变量。
(2)术语定义
- 共享变量:在多个线程之间能够被共享的变量被称为共享变量。共享变量包括所有的实例变量,静态变量和数组元素。他们都被存放在堆内存中,Volatile只作用于共享变量。
- 内存屏障(Memory Barriers):是一组处理器指令,用于实现对内存操作的顺序限制。
- 缓冲行(Cache line):缓存中可以分配的最小存储单位。处理器填写缓存线时会加载整个缓冲行,需要使用多个主内存读周期。
- 原子操作(Atomic operations):不可中断的一个或一系列操作。
- 缓存行填充(cache line fill)当处理器识别到从内存中读取操作数是可缓存的,处理器读取整个缓存行到适当的缓存(L1,L2,L3的或所有)
- 缓存命中(cache hit)如果进行高速缓存行填充操作的内存位置仍然是下次处理器访问的地址时,处理器从缓存中读取操作数,而不是从内存。
- 写命中(write hit)当处理器将操作数写回到一个内存缓存的区域时,它首先会检查这个缓存的内存地址是否在缓存行中,如果存在一个有效的缓存行,则处理器将这个操作数写回到缓存,而不是写回到内存,这个操作被称为写命中。
- 写缺失(write misses the cache)一个有效的缓存行被写入到不存在的内存区域
(3)Volatile的实现原理
在X86-32架构下操作volatile变量会生成一个带有lack前缀的指令,lock前缀的作用为:
- 将当前处理器缓存行的数据会写回到系统内存。
- 这个写回内存的操作会引起在其他CPU里缓存了该内存地址的数据无效。
二、JavaSE1.6中的Synchronized
1、
(1)引言
Synchronized,很多人都会称呼它为重量级锁,但是随着Java SE1.6对Synchronized进行了各种优化之后,有些情况下它并不那么重了,Java SE1.6中为了减少获得锁和释放锁带来的性能消耗而引入的偏向锁和轻量级锁,以及锁的存储结构和升级过程。
(2)术语定义
- CAS(Compare and Swap):比较并设置,硬件保证这个操作的原子性,一致则修改,不一致则不修改,返回错误
(3)同步的基础
Java中的每一个对象都可以作为锁。
- 对于同步方法,锁是当前实例对象。
- 对于静态同步方法,锁是当前对象的Class对象。
- 对于同步方法块,锁是Synchonized括号里配置的对象。
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。那么锁存在哪里呢?锁里面会存储什么信息呢?
(4)同步的原理
JVM规范规定JVM基于进入和退出Monitor对象来实现方法同步和代码块同步。代码块同步是使用monitorenter和monitorexit指令实现。
任何对象都有一个 monitor 与之关联,当且一个monitor 被持有后,它将处于锁定状态。线程执行到 monitorenter 指令时,将会尝试获取对象所对应的 monitor 的所有权,即尝试获得对象的锁。
Java对象头
锁存在Java对象头里。如果对象是数组类型,则虚拟机用3个Word(字宽)存储对象头,如果对象是非数组类型,则用2字宽存储对象头。在32位虚拟机中,一字宽等于四字节,即32bit。 内容如下
- 第一个字:Mark Word 存储对象的hashCode或锁信息等。
- 第二个字:Class Metadata Address 存储到对象类型数据的指针
- 第三个字:Array length 数组的长度(如果当前对象是数组)
32位机器的Mark Word内容
| |25 bit |4bit |1bit是否是偏向锁 |2bit锁标志位 | |-|——-|—–|—————-|————| |无锁状态| 对象的hashCode| 对象分代年龄| 0| 01|
在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。Mark Word可能变化为存储以下4种数据:
锁状态 | 1bit是否是偏向锁 | 2bit锁标志位 |
---|---|---|
指向栈中锁记录的指针(轻量级锁) | x | 00 |
指向互斥量(重量级锁)的指针 | x | 10 |
GC标记 | x | 11 |
偏向锁 | 1 | 01 |
无锁 | 0 | 01 |
锁的升级
Java SE1.6里锁一共有四种状态,无锁状态,偏向锁状态,轻量级锁状态和重量级锁状态。 它会随着竞争情况逐渐升级。锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁。
偏向锁
- 大多数情况下锁不存在多线程竞争,而且总是由同一线程多次获得
- 当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID
- 以后该线程在进入和退出同步块时不需要花费CAS操作来加锁和解锁,而只需简单的测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁
- 如果测试成功,表示线程已经获得了锁,
- 如果测试失败,则需要再测试下Mark Word中偏向锁的标识是否设置成1(表示当前是偏向锁)
- 如果设置了,则尝试使用CAS将对象头的偏向锁指向当前线程。
- 如果没有设置,则使用CAS竞争锁
偏向锁的撤销
不会主动撤销偏向锁,当别的线程竞争时,而本线程已经结束了,才会撤销锁
轻量级锁
轻量级锁加锁:线程在执行同步块之前,JVM会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中,官方称为Displaced Mark Word。然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。
轻量级锁解锁:轻量级解锁时,会使用原子的CAS操作来将Displaced Mark Word替换回到对象头,如果成功,则表示没有竞争发生。如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁。下图是两个线程同时争夺锁,导致锁膨胀的流程图。
锁的优缺点对比
- 偏向锁
- 优点:加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。
- 缺点:如果线程间存在锁竞争,会带来额外的锁撤销的消耗。
- 适用于:适用于只有一个线程访问同步块场景。
- 轻量级锁
- 优点:竞争的线程不会阻塞,提高了程序的响应速度。
- 缺点:如果始终得不到锁竞争的线程使用自旋会消耗CPU。
- 适用于:追求响应时间。同步块执行速度非常快。
- 重量级锁
- 优点:线程竞争不使用自旋,不会消耗CPU。
- 缺点:线程阻塞,响应时间缓慢。
- 适用于:追求吞吐量。同步块执行速度较长。
三、Java线程池的分析和使用
1、
(1)线程池的好处
- 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 第三:提高线程的可管理性。
(2)线程池的创建
我们可以通过ThreadPoolExecutor来创建一个线程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);
- corePoolSize(线程池的基本大小)
- runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列
- maximumPoolSize(线程池最大大小)
- ThreadFactory
- RejectedExecutionHandler(饱和策略)
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
- 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
- keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。
- TimeUnit(线程活动保持时间的单位)
(3)向线程池提交任务
- execute:无返回值
- submit:返回一个Future
(4)线程池的关闭
shutdown
将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
shutdownNow
遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程。
源码分析
- 线程池内部定义了Worker线程类,其run作用是:在死循环中不断的从任务队列中去除runnable来执行
- 工作线程有一个
Runnable firstTask;
成员变量,线程启动后会执行
- 工作线程有一个
HashSet<Worker>
用于存放工作线程的容器BlockingQueue<Runnable>
任务队列- execute算法:
- 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
- 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
- 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
- 是一种生产者消费者模型
(5)合理的配置线程池
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
- 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
- CPU密集型任务:配置
cpu数+1
个线程
- CPU密集型任务:配置
- 任务的优先级:高,中和低。
- 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理
- 任务的执行时间:长,中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
建议使用有界队列
(6)线程池的监控
- taskCount:线程池需要执行的任务数量。
- completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
- largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
- getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
- getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:
(7)使用线程池可能带来的问题
- 线程饥饿死锁:线程池也是一种资源,多个任务互相等待对方完成的情况下,在线程池数量一定的情况下会发生死锁
- 运行较长时间任务会降低线程池性能
四、深入分析ConcurrentHashMap
1、各种HashMap的特性
(1)HashTable
- 一种线程安全的hash表,所有操作都加了全局锁,并发性较低
- 迭代器使用旧式的
Enumeration
迭代器 - 不允许存在key为null的值
- 不允许存在value为null的值
(2)HashMap
- 非线程安全Hash表
- 允许存在key为null的一条记录
- 允许存在value为null的多条记录
(3)ConcurrentHashMap
- 类似于HashMap
- 线程安全,使用粒度更小的锁——分段锁,保证并发性
2、实现原理
使用Segment对象,将HashEntry数组封装起来,并让Segment继承ReentrantLack
(可重入锁)
- 读的时候不加锁
- 段的数目收到concurrencyLevel并行等级控制,其指等于
2^concurrentLevel
- 写入的时候对每一段加锁
- Hash方式不同
- HashMap使用:entrys[hash]
- ConcurrentHashMap使用
segments[hash高位]
获取到entrys在同entrys[hash低位]
获取到entry
- 扩容与HashMap不同,可以单独对某段进行扩容。
- 求size,不加锁的情况下求两次所有段数目的和,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。
五、原子操作的实现原理
3、处理器如何实现
- 处理器自动保证基本内存操作的原子性
- 32位IA-32处理器使用基于对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。
4、JAVA如何实现原子操作
在java中可以通过锁和循环CAS(自旋锁)的方式来实现原子操作。
(1)使用循环CAS实现原子操作
自旋cas
private AtomicInteger atomicI = new AtomicInteger(0);
private void safeCount() {
for (;;) {
int i = atomicI.get();
boolean suc = atomicI.compareAndSet(i, ++i);
if (suc) {
break;
}
}
}
问题
- ABA问题
- 循环时间长开销大(自旋)
- 只能保证一个共享变量的原子操作
(2)使用锁机制实现原子操作
锁机制保证了只有获得锁的线程能够操作锁定的内存区域。JVM内部实现了很多种锁机制,有偏向锁,轻量级锁和互斥锁,有意思的是除了偏向锁,JVM实现锁的方式都用到的循环CAS,当一个线程想进入同步块的时候使用循环CAS的方式来获取锁,当它退出同步块的时候使用循环CAS释放锁。详细说明可以参见文章Java SE1.6中的Synchronized
六、ConcurrentLinkedQueue的实现原理
七、Java中的阻塞队列
八、Fork/Join框架介绍
九、Java中的CopyOnWrite容器
写时复制,实现线程安全
原理:读时不加锁,写时加锁(先拷贝一份,然后修改引用),弱一致性,导致读到旧数据
public boolean add(T e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 复制出新数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 把新元素添加到新数组里
newElements[len] = e;
// 把原数组引用指向新数组
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
final void setArray(Object[] a) {
array = a;
}
十、生产者消费者模式
略
十一、StringBuffer和StringBuilder
同样实现字符串拼接,是可变对象,但是
- StringBuffer 是线程安全的
- StringBuilder 是非线程安全的
十二、同步容器和并发容器
1、同步容器
(1)JDK同步容器
- Vector
- Stack
- HashTable
- Collections.synchronized方法生成,例如:
- Collectinons.synchronizedList()
- Collections.synchronizedSet()
- Collections.synchronizedMap()
- Collections.synchronizedSortedSet()
- Collections.synchronizedSortedMap()
(2)同步容器迭代器问题
在迭代过程中别的线程删除容器会抛出 ConcurrentModificationException
。
解决办法,对所迭代代码段加锁。
注意:有些方法隐式的使用了迭代如 toString,hashCode,equalse,containsAll,removeAll,retainAll等方法都会隐式的Iterate,也可能抛出ConcurrentModificationException
2、并发容器
(1)JDK中并发容器
ConcurrentHashMap
对应的非并发容器:HashMap 目标:代替Hashtable、synchronizedMap,支持复合操作 原理:JDK6中采用一种更加细粒度的加锁机制Segment“分段锁”,JDK8中采用CAS无锁算法,详细分析推荐阅读【JDK】:ConcurrentHashMap高并发机制——【转载】。
CopyOnWriteArrayList
对应的非并发容器:ArrayList 目标:代替Vector、synchronizedList 原理:利用高并发往往是读多写少的特性,对读操作不加锁,对写操作,先复制一份新的集合,在新的集合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性,当然写操作的锁是必不可少的了。 关于这一部分可参考【JDK】:CopyOnWriteArrayList、CopyOnWriteArraySet 源码解析
CopyOnWriteArraySet
对应的费并发容器:HashSet 目标:代替synchronizedSet 原理:基于CopyOnWriteArrayList实现,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法,其遍历当前Object数组,如Object数组中已有了当前元素,则直接返回,如果没有则放入Object数组的尾部,并返回。 关于这一部分可参考【JDK】:CopyOnWriteArrayList、CopyOnWriteArraySet 源码解析
ConcurrentSkipListMap
对应的非并发容器:TreeMap 目标:代替synchronizedSortedMap(TreeMap) 原理:Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照Key值升序的。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过”空间来换取时间”的一个算法。ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够在O(log(n))时间内完成查找、插入、删除操作。
ConcurrentSkipListSet
对应的非并发容器:TreeSet 目标:代替synchronizedSortedSet 原理:内部基于ConcurrentSkipListMap实现
ConcurrentLinkedQueue
不会阻塞的队列 对应的非并发容器:Queue 原理:基于链表实现的FIFO队列(LinkedList的并发版本)
LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue
对应的非并发容器:BlockingQueue 特点:拓展了Queue,增加了可阻塞的插入和获取等操作 原理:通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒 实现类: LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列 ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列 PriorityBlockingQueue:按优先级排序的队列
十三、构建高效且可伸缩的结果缓存
1、缓存实例
(1)缓存计算结果
使用HashMap+同步块实现
问题:并行性低太低,对于同步快来说是串行的,还带来了锁开销
使用ConcurrentHashMap
并发性更好 问题:没有加锁的话会出现,计算结果计算两遍的问题
使用ConcurrentHashMap储存<参数类型, Future<结果类型>>
使用ConcurrentHashMap储存<参数类型, Future<结果类型>> + ConcurrentHashMap提供的原子操作
十四、线程的取消和关闭
需要中断的原因:
- GUI程序中,用户点击了取消
- 有时间限制的操作
- 应用程序事件。多个搜索线程中某个线程获得了解
- 发生了错误,导致所有线程的执行条件达不到要求,比如爬虫程序中,磁盘已满
- 当程序需要关闭时,所有工作线程需要平滑关闭
1、使用自定义volitale标志量
在线程类中定义一个boolean的volitale变量
问题: 当线程工作中遇到了阻塞(睡眠、等待IO、锁、wait等情况),外界的取消操作无法及时响应
2、使用Thread的中断机制
Thread 中断函数原型
public class Thread {
/*
如果线程在调用 Object 类的 wait()、wait(long) 或 wait(long, int) 方法,或者该类的 join()、join(long)、join(long, int)、sleep(long) 或 sleep(long, int) 方法过程中受阻,则其中断状态将被清除,它还将收到一个 InterruptedException。
*/
public void interrupt(){} //让该线程发生一个中断
public boolean isInterrupted(){} //返回之前的中断状态
public static boolean interrupted(){} //返回之前的中断状态,并清除中断状态
//当,抛出InterruptedException异常后,中断状态自动清除
}
使用例子:
Thread t = new Thread(new Runnable(){
@Override
public void run(){
try{
while(!Thread.currentThread().isInterrupted()){
System.out.println("running...");
Thread.sleep(5000);
}
} catch (InterruptedException e){
System.out.println(Thread.currentThread().isInterrupted());
System.out.println(Thread.interrupted());
System.out.println(Thread.currentThread().isInterrupted());
System.out.println("interrupted...");
//让线程退出
}
}
});
t.start();
Thread.sleep(100);
t.interrupt();
/*输出
running...
false
false
false
interrupted...
*/
十五、J.U.C体系
JUC指的是:java.util.concurrent
包容
参见:https://blog.csdn.net/hp910315/article/details/50963095
十六、并发相关类的模拟
1、信号量的模拟实现
class Semaphore{
private int permits; //许可证的值
public Semaphore(int permits){
this.permits = permits;
}
//P操作
public synchronized void acquire() throws InterruptedException{
if(permits<=0){
wait();
}
permits--;
}
//V操作
public synchronized void release(){
permits++;
notify();
}
}
//测试
Semaphore s = new Semaphore(1);
s.acquire();
new Thread(()->{
try{
Thread.sleep(1000);
} catch (Exception ignored) {}
s.release();
}).start();
s.acquire();
//1秒后退出
2、可重入锁的实现
class ReentrantLock{
private int ref = 0; //引用计数,用于实现可重入锁
private Thread nowThread=null; //持有锁的线程
public synchronized void lock(){
if(ref==0){
nowThread = Thread.currentThread();
ref ++;
} else {
if(nowThread==Thread.currentThread()){
ref ++;
} else {
while(ref!=0){
try{
wait();
}catch (Exception ignored){}
}
nowThread = Thread.currentThread();
ref++;
}
}
}
public synchronized void unlock(){
if(ref>0 && nowThread == Thread.currentThread()) {
ref--;
if(ref==0){
notify();
}
} else {
return;//返回或者抛出异常
}
}
}
public static void main(String[] args) throws Exception {
ReentrantLock lock = new ReentrantLock();
new Thread(()->{
try{
lock.lock();
Thread.sleep(100);
System.out.println(Thread.currentThread().getName()+":持有了锁");
lock.lock();
Thread.sleep(100);
System.out.println(Thread.currentThread().getName()+":重入了锁");
lock.unlock();
System.out.println(Thread.currentThread().getName()+":解除了一重锁");
lock.unlock();
System.out.println(Thread.currentThread().getName()+":解除了所有的锁");
} catch (Exception ignored) {}
}).start();
Thread.sleep(10);
lock.lock();
System.out.println(Thread.currentThread().getName()+":持有了锁");
lock.unlock();
System.out.println(Thread.currentThread().getName()+":解除了锁");
}
/*
输出:
Thread-0:持有了锁
Thread-0:重入了锁
Thread-0:解除了一重锁
Thread-0:解除了所有的锁
main:持有了锁
main:解除了锁
*/
十七、并发程序的测试
1、正确性测试
(1)基本功能测试
在单线程中对基本功能进行测试
(2)对阻塞操作的测试
使用中断实现
- 创建一个子线程,在子线程中调用阻塞方法,并用try catch包裹
- 在调用的下一行执行测试失败的返回函数
- 在主线程启动子线程
- 子线程启动后主线程睡眠一下
- 主线程使子线程线程中断
- 主线程join子线程
(3)安全性测试
- 使用伪随机数生成器测试
- 使用同步工具(CyclicBarrier)使所有线程启动完毕之后在执行测试逻辑
(4)其他测试
- 资源管理测试(使用内存转储)
- 使用回调
- 产生大量的交替操作(使用yield或sleep)
2、性能测试
- 添加运行时间的统计
- 多种算法的比较
- 响应性度量
3、避免性能测试的陷阱
测试可能没有考虑
- 垃圾回收
- 动态编译JIT
- 。。。
4、其他的测试方法
- 代码审查
- 静态分析工具
- 面向方面测试
- 分析与检测工具