Java并发
CPU
现代CPU为了提升执行效率,为了减少CPU与内存交互,在CPU上做了多级缓存,一般是三级缓存
- 一级缓存:数据缓存和指令缓存,逻辑核独占
- 二级缓存:物理核独占,一个物理核心一个L1和L2,逻辑核共享
- 三级缓存:所有物理核共享
CPU寄存器 > L1 > L2 > L3 > 内存
缓存由最小的存储区块——缓存行(缓存由N个缓存行组成)组成,缓存行一般64字节
CPU读的时候依次从L1 L2 L3 内存去找,在内存找到会依次给L3 L2 L1都拷贝一份
空间局部性原则:去内存拿数据时,会把目标相邻的数据也读到缓存
时间局部性原则:一个数据正在被访问,那么近期很可能会被再次访问,缓存不会丢弃
线程上下文切换:线程切换,保存上一个线程的信息,加载下一个线程信息,数据保存在内存Task State Segment(任务状态段)
运行状态切换:内核态,用户态
内存用户空间(保护系统):JVM,软件,360
内存内核空间:OS
CPU运行安全级别
ring0:最高级别,内核态,可以执行很多操作,刷盘,很敏感
ring1
ring2
ring3:用户态
Linux和Windows只用了ring0,ring3两个级别
Java里面线程创建调度和管理由内核完成,内核保存线程的状态和上下文信息。
volatile
Java虚拟机轻量级同步机制
- 保证被volatile修饰的变量对所有线程可见,一个线程修改了共享变量,会及时通知其它线程。(非volatile共享变量在不同线程中修改值,其它线程早晚也会知道,只是不能及时知道变量变化了)
- 每个线程会在自己线程独有保存一个共享变量的值,一个线程修改了共享变量值之后,会通知其它线程值已修改,其它线程丢弃自己的脏数据,或者覆盖脏数据,然后读取新值
- 禁止指令重排,JVM会根据代码逻辑进行智能优化,重排代码执行顺序。增加性能,高并发下可能出问题
Monitor监视器锁
一种同步机制,一个对象。Java对象天生带了Monitor锁,也就是synchronized的对象锁,这些状态都放在对象头Mark Word
任何对象都有一个Monitor与之关联,当一个Monitor被持有后,它就处于锁定状态。依赖OS,-> Mutex互斥量 -> 操作系统维护(要切换内核态)
查看对象内存布局
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.10</version>
</dependency>
Object o = new Object();
System.out.println(ClassLayout.parseInstance(o).toPrintable());
synchronized
同步锁,内置锁,隐式锁,保证原子性,可重入
基于进入和退出Monitor对象来实现同步
锁膨胀:JDK1.6后会自适应升级,无锁,偏向锁,轻量级锁,重量级锁
锁消除:JVM检测同步代码不可能存在锁竞争的情况就会清除同步锁。实现基于逃逸分析,比如只在一个方法内执行,不会逃逸出这个方法,那么就不会被其它线程使用。
锁的粗化:将多个连续的加锁,解锁操作 优化成一个大的同步锁
自旋锁:不断循环,尝试获取锁,可以避免一些线程的挂起和恢复(这个过程要用户态转入内核态,比较慢)
- 内核态:CPU可以访问内存所有数据,包括外围设备,例如硬盘,网卡。CPU也可以将自己从一个程序切换到另一个程序
- 用户态:只能受限的访问内存,且不允许访问外围设备,占用CPU的能力被剥夺,CPU资源可以被其他程序获取。
这样区分是为了防止用户进程获取别的程序的内存数据。
Synchronized依赖操作系统实现,使用同步锁需要进行用户态到内核态的切换。简单来说,在JVM中monitor enter和monitor exit字节码是依赖于底层操作系统的Mutex Lock来实现,但是由于使用Mutex Lock需要将当前线程挂起并从用户态切换到内核态来执行,这种切换的代价是非常昂贵的。
AQS和ReentrantLock
AQS 的全称 AbstractQueuedSynchronizers抽象队列同步器
在Java并发包(java.util.concurrent)中很多锁都是通过AQS来实现加锁和释放锁的过程
ReentrantLock是AQS框架的应用实现,线程同步的手段,具有比synchronized更多特性,支持手动加锁,解锁,支持加锁公平性,可重入
ReentrantLock lock = new ReentrantLock(false); // true公平锁 false非公平锁
lock.lock()
lock.unlock()
加锁,释放锁逻辑
lock.lock() //加锁
while(true){
if(cas加锁成功){
break;
}
HashSet,LikedQueued(),
HashSet.add(Thread)
LikedQueued.put(Thread)
LockSupport.park(); //阻塞
}
// 拿到锁后执行业务逻辑
lock.unlock() //解锁
Thread t= HashSet.get()
Thread t = LikedQueued.take();
LockSupport.unpark(t);
核心实现:自旋,LocksSuport, CAS(加锁,不管有多少线程来,永远要保证只有1个线程能加锁成功)
使用queue队列,原因:要想实现公平和非公平,需要排队,FIFO。就保证了公平性。
- 公平锁:竞争锁需要排队,当可以获取锁时还需判断队列有没有线程排队,没线程就CAS修改state
- 非公平锁:当可以获取锁时就竞争锁(CAS修改state)
AQS包含三个组件
exclusiveOwnerThread 当前获取锁的线程是谁
state 状态器(为0就没有线程持有,无锁状态)
同步等待队列(CHL队列):基于双向链表数据结构的队列FIFO
- 锁竞争:state=0,没人就CAS;state=1,有人就判断当前获取锁的线程是不是自己,是就state+1,不是自己就加锁失败
Semaphore
信号量,控制访问特定资源线程数目,底层依赖AQS的State
可以做服务限流
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2); // 10个线程一次允许2个允许
for (int i = 0;i < 10;i++) {
new Thread(new MyTask(semaphore, "t" + i)).start();
}
}
static class MyTask extends Thread{
Semaphore semaphore;
public MyTask(Semaphore semaphore, String tName) {
super(tName);
this.semaphore = semaphore;
}
@Override
public void run() {
try {
// semaphore.acquire(); // 获取公共资源
if (semaphore.tryAcquire(500, TimeUnit.MILLISECONDS)) { // 如果拿到了执行下面的,拿不到最多等待500ms
System.out.println(Thread.currentThread().getName() + "run");
Thread.sleep(2000); // do something
semaphore.release(); // 释放公共资源
} else {
System.out.println("降级"); // 降级
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Semaphore semaphore = new Semaphore(2); // state初始值2,并行度2,总容量池
semaphore.acquire(); // 每个线程获取1个凭据,state-1(这里可以带参n,获取n个凭据,返还也返还n个)
semaphore.release(); // 凭据归还,state+1
semaphore.tryAcquire(500, TimeUnit.MILLISECONDS) // 尝试获取,拿不到最多等待500ms
new Semaphore(int permits, boolean fair); // 构造方法,permits许可线程数量 fair公平性,true:下次执行的线程是等待最久的线程
如果凭证不足够就会进入CLH队列然后阻塞
其它线程释放凭证之后会唤醒CLH队列中的第一个线程
唤醒的线程判断节点是不是头节点,如果是head就会不断尝试,是否有足够凭证,如果有凭证自己就会往下执行逻辑(逻辑执行完后就释放),然后往CHL队列后面传播
不是头节点就会改变信号量状态,改为唤醒状态,然后park阻塞
CountDownLaunch
它能够使一个线程等待其它线程完成各自工作后再往下执行
CountDownLaunch通过计数器实现,计数器初始值为线程数量,一个线程完成计数器-1,计数器=0说明所有线程完成工作
main等待多个线程执行完成后再往下执行
// 计数 CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(() -> { System.out.println(Thread.currentThread().getName()+" 完成"); // 一个线程执行完之后释放 计数-1 countDownLatch.countDown(); },"写代码").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName()+" 完成"); // 一个线程执行完之后释放 计数-1 countDownLatch.countDown(); },"洗漱").start(); countDownLatch.await(); //等待所有任务完成才往下走 System.out.println(Thread.currentThread().getName()+" 睡觉");
反过来使用,让多个线程在同一时刻起跑
// 计数 CountDownLatch countDownLatch = new CountDownLatch(1); for (int i=0;i<10;i++) { new Thread(() -> { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "完成"); },"线程" + i).start(); } // 睡眠3S后让10个线程同时跑(这样线程都是创建完毕的,然后统一跑) Thread.sleep(3000); countDownLatch.countDown();
CyclicBarrier
线程栅栏可以让一组线程等待至某个状态(屏障点)之后再全部同时执行。他和countdownlaunch不同可以重复使用
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() {
@Override
public void run() {
// 当所有线程达到屏障就执行
System.out.println("线程就位,发车");
}
});
for (int i=0;i<10;i++) {
new MyThread(cyclicBarrier).start();
}
// 反复使用
Thread.sleep(2000);
for (int i=0;i<10;i++) {
new MyThread(cyclicBarrier).start();
}
}
static class MyThread extends Thread {
private CyclicBarrier cyclicBarrier;
MyThread(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "执行");
// 告诉CyclicBarrier我已经到达了屏障
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
atomic
保证原子性,Atomic包12个类,4种原子更新方式,实现大多使用Unsafe
用的无锁算法CAS
底层基于魔术类Unsafe提供的CAS-API完成
- CAS三大API
atomic使用compareAndSwapObject compareAndSwapInt compareAndSwapLong 这三个基于硬件原语——CMPXCHG实现原子操作CAS
public static int total; static AtomicInteger atomicInteger = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { new Thread(() -> { for (int j = 0; j < 10000; j++) { // total++; atomicInteger.getAndIncrement(); } countDownLatch.countDown(); }, "线程" + i).start(); } // 等待10线程执行完 countDownLatch.await(); System.out.println(atomicInteger.get()); }
- AtomicInteger底层
do { oldValue = this.getIntVolatile(var1, var2); // 读取AtomicInteger的value值 // valueOffset:value属性在对象内存当中的偏移量 // 想要CAS修改某个对象的值,需要知道属性在对象空间的哪个位置,必须知道偏移量 } while(!this.compareAndSwapInt(AtomicInteger, valueOffset, oldValue, oldValue + 1)); return var5;
- 原子修改数组
static int[] arr = new int[]{1,2}; static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(arr); // 原子修改 atomicIntegerArray.getAndSet(0, 100); // atomicArray会修改 System.out.println(atomicIntegerArray.get(0)); // 但是数组本身不会修改 System.out.println(arr[0]);
- 原子修改自定义对象
static Student[] students = new Student[]{new Student(1, "bob")}; static AtomicReferenceArray objectArray = new AtomicReferenceArray(students); System.out.println("------- 自定义对象 ------"); objectArray.set(0, new Student(2, "tom")); System.out.println(objectArray.get(0));
- 原子修改对象属性
// 这里需要修改的属性name必须是public volatile static AtomicReferenceFieldUpdater fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name"); System.out.println("------ 自定义对象属性 ------"); Student student = new Student(3, "alice"); fieldUpdater.set(student, "alice2"); System.out.println(student);
CAS ABA
ABA问题:线程1查看A值100,线程2修改A值 +1 -1,然后线程1再次查看A值100。线程1认为A值并未发生过改变
static AtomicInteger atomicInteger = new AtomicInteger(100);
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
int a = atomicInteger.get();
System.out.println("线程1获取a值:" + a);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean isCasSuccess = atomicInteger.compareAndSet(100, 200);
if (isCasSuccess) {
System.out.println("CAS成功,A:" + atomicInteger.get());
} else {
System.out.println("CAS失败");
}
}, "线程1");
Thread thread2 = new Thread(() -> {
atomicInteger.getAndIncrement();
System.out.println("线程2修改A:" + atomicInteger.get());
atomicInteger.decrementAndGet();
System.out.println("线程2修改A:" + atomicInteger.get());
}, "线程2");
thread1.start();
thread2.start();
}
// 最终CAS会成功
- 利用版本号解决ABA问题,CAS不仅判断数据是否一样,还判断版本是否一样
/** * 100初始值, 0版本号 */ static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 0); public static void main(String[] args) { Thread thread1 = new Thread(() -> { int stamp = atomicStampedReference.getStamp(); int a = atomicStampedReference.getReference(); System.out.println("线程1获取a值:" + a + " 版本号:" + stamp); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } boolean isCasSuccess = atomicStampedReference.compareAndSet(a, a + 100, stamp, stamp + 1); if (isCasSuccess) { System.out.println("CAS成功,A:" + atomicStampedReference.getReference() + " 版本号:" + atomicStampedReference.getStamp()); } else { System.out.println("CAS失败"); } }, "线程1"); Thread thread2 = new Thread(() -> { // 第一次修改 int stamp = atomicStampedReference.getStamp(); int a = atomicStampedReference.getReference(); atomicStampedReference.compareAndSet(a, a + 1, stamp, stamp + 1); System.out.println("线程2修改A:" + atomicStampedReference.getReference() + " 版本号:" + atomicStampedReference.getStamp()); // 第二次修改 stamp = atomicStampedReference.getStamp(); a = atomicStampedReference.getReference(); atomicStampedReference.compareAndSet(a, a - 1, stamp, stamp + 1); System.out.println("线程2修改A:" + atomicStampedReference.getReference() + " 版本号:" + atomicStampedReference.getStamp()); }, "线程2"); thread1.start(); thread2.start(); } // CAS失败
Unsafe类
魔术类是sun.misc包下,提供用于执行低级别,不安全操作的方法。如直接访问系统内存资源,自主管理内存资源等。这些方法在提升Java允许效率,增强java底层资源能力起到很大作用
由于Unsafe拥有类似C指针一样操作内存空间的能力,增加了程序发生指针问题的风险。使用一定要慎重
Unsafe为单例类,提供getUnsafe获取单例,只有调用的类为引导类加载器所加载才合法
- 内存管理
文件大量并发上传,堆可能经常满,GC
可以用Unsafe申请堆外内存,推外内存不属于GC管理,用完一定手动释放,否则内存泄露public class UnsafeManageMemory { private Unsafe unsafe; private long address; // 获取单例 public static Unsafe getUnsafe() throws IllegalAccessException, NoSuchFieldException { Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); //Field unsafeField = Unsafe.class.getDeclaredFields()[0]; //也可以这样,作用相同 unsafeField.setAccessible(true); return (Unsafe) unsafeField.get(null); } void doSomething() throws NoSuchFieldException, IllegalAccessException { long file = 65536L; byte size = 8; // 获取单例 unsafe = getUnsafe(); // 分配内存 address = unsafe.allocateMemory(size); System.out.println("address:" + address); // 写入内存 unsafe.putAddress(address, file); // 读内存 long readValue = unsafe.getAddress(address); System.out.println("read:" + readValue); } public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException { UnsafeManageMemory unsafeManageMemory = new UnsafeManageMemory(); unsafeManageMemory.doSomething(); } @Override protected void finalize() throws Throwable { super.finalize(); // 释放内存 unsafe.freeMemory(address); } }
- 其它
unsafe.monitorEnter(object); // 加锁 unsafe.monitorExit(object); // 解锁 unsafe.park(true, 0L); // 阻塞true:ms定时,false:ns定时 unsafe.unpark(thread); // 唤醒
线程
线程是调度CPU资源最小单位,线程模型分为KLT ULT,JVM使用KLT:Java线程与OS线程保存1:1,一个Java线程对应在操作系统一个线程
状态state:
NEW 新建
RUNNABLE 运行
BLOCKED 阻塞
WAITING 等待
TIMED_WAITING 超时等待
TERMINATED 终结
协程:纤程,用户线程,为了追求最大力度发挥硬件性能和提升软件速度,在某个点挂起当前任务,保留栈信息,然后执行另外的任务。整个过程不需要上下文切换
Java原生不支持协程,需要引入三方包,quasar
线程池
线程池就是线程缓存,对线程进行统一分配,调优,监控
线程是重要稀缺资源,线程池能够重用线程,减少线程创建,消除的开销
线程数量设置:
CPU密集型:CPU核数+1
IO密集型:2CPU核数+1
其它:CPU核数 * [1 + (I/O耗时 / CPU耗时)]
线程池适用:
- 单个任务处理时间短
- 处理任务数量大
参数
线程池5种状态:RUNNING:接收新任务,处理。SHUTDOWN:不接收新任务,只处理存在任务。STOP不接收,不处理。TIDYING:所有任务终止。TERMINATED:线程池彻底终止int corePoolSize // 核心线程数 int maximumPoolSize // 最大线程数 long keepAliveTime // 最大允许不干活时间 TimeUnit unit // keepAliveTime时间单位 BlockingQueue<Runnable> workQueue // 存放未来得及执行的任务 ThreadFactory threadFactory // 创建线程的工厂 RejectedExecutionHandler handler // 拒绝策略
threadPool.shutdown(); // running -> shutdown threadPool.shutdownNow() // running -> stop
- 一开始创建线程池没有一个线程,任务来了再创建线程,来一个创建一个线程(不会复用线程,直到创建完核心线程数个线程,之后再复用线程)
- 任务来了,核心线程没满就创建线程去执行任务,满了就放到阻塞队列(核心线程执行完任务去阻塞队列拿新任务)。阻塞队列满了就往非核心线程放,所有线程满了就走拒绝策略。
- 拒绝策略
AbortPolicy:抛出RejectedExecutionException异常(默认) DiscardPolicy:也是丢弃任务,但是不抛出异常。(啥也不干) DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) CallerRunsPolicy:由调用线程处理该任务 拒绝策略可以重写,比如可以把任务序列号放到redis,然后拿回来放到线程池执行
非核心线程超时后(线程数>核心线程数的时候)(线程不分核心,非核心,最后剩余的就是核心的线程,超出的就是非核心),会释放掉
其中allowCoreThreadTimeOut参数设置为true的情况(默认false),核心线程超时也会释放(线程超时就释放)
线程超时:线程跑到阻塞队列拿任务,没拿到,阻塞队列为空然后再过keepAliveTime时长后就超时
定时线程池
ScheduleThreadPoolExecutor 定时线程池:处理延时任务or定时任务
继承ThreadPoolExecutor,没有非核心线程的说法
抛出异常会干掉异常的任务
- 延迟执行
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); ScheduledFuture<Integer> scheduledFuture = scheduledThreadPoolExecutor.schedule(() -> { System.out.println("延迟执行"); return 1; }, 2000, TimeUnit.MILLISECONDS); System.out.println("do something"); // 等待任务执行完 System.out.println(scheduledFuture.get()); System.out.println("finally");
- 定时执行
static Logger scheduleLog = Logger.getLogger("scheduleLog"); // 发心跳,定时2S执行 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor1 = new ScheduledThreadPoolExecutor(1); scheduledThreadPoolExecutor1.scheduleAtFixedRate(() -> { scheduleLog.info("heart beat"); try { // 线程执行直接大于定时间隔时间,会间隔5秒执行一次 Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }, 1000, 2000, TimeUnit.MILLISECONDS);
- 推迟执行
static Logger scheduleLog = Logger.getLogger("scheduleLog"); // 比上一次执行推迟2S执行 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor2 = new ScheduledThreadPoolExecutor(1); scheduledThreadPoolExecutor2.scheduleWithFixedDelay(() -> { scheduleLog.info("推迟heart beat"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }, 1000, 2000, TimeUnit.MILLISECONDS);
- Timer
// Timer不推荐使用,抛出异常会把线程干掉,接下来就无法执行新来的task Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { System.out.println("timer定时"); } }, 1000, 2000);
底层
底层采用DelayQueue存储等待任务
DelayQueue内部封装PriorityQueue,根据time先后顺序排序,给任务排序
任务来了直接放到队列,不是周期任务就执行一次(suepr.run),周期任务设置下次执行时间,然后执行自己
DelayQueue -> 堆结构 -> 数组
最小堆结构,最小的放到最上面,左右节点无序
常用定时任务
- Timer:单线程,线程挂了,不会再创建线程执行任务
- ScheduleThreadPoolExecutor:线程挂了,任务丢弃,但是再提交任务可以继续执行
- xxl-job:定时任务 + 分布式调度
- quartz:单机的定时任务,功能强大
Fork/Join
并行执行任务框架,把大任务分割成若干小任务
适合计算类型
Future
线程的执行结果
Future<String> future = threadPoolExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
// restTemplate.getForObject...
Thread.sleep(2000);
return "test";
}
});
// 等待任务执行完,在这里park,结果出来了再唤醒
Object o = future.get(2, TimeUnit.SECONDS);
System.out.println(o);
工作
每个线程有各自对应的一个队列(双端队列),但是线程有可能窃取其它队列的任务执行
任务从队尾进入,线程获取自己队列从队尾拿,窃取其它队列任务从队头拿
构造参数
- int parallelism:并行度,默认和CPU个数一致
- ForkJoinWorkerThreadFactory factory:创建工作线程的工厂
- UncaughtExceptionHandler handler:线程异常处理器
- boolean asyncMode:队列中的任务调度方式,默认LIFO,可以设置为true(FIFO)
使用
一般只需使用ForkJoin提供的子类
RecursiveAction:用于没有返回结果的任务,这个会把自己工作分割成更小的几块,这样它们可以由独立的线程执行
RecursiveTask:用于有返回结果的任务,将工作分割若干更小任务,将这些子任务执行合并到一个集合里面
CountedCompleter:任务完成执行后会触发执行一个自定义的钩子函数
- 数组排序
public class TestForkJoinSort { public static void main(String[] args) { Random random = new Random(); int[] data = new int[5000000]; for(int i = 0 ; i < data.length ; i++){ data[i] = random.nextInt(5000000); } ForkJoinPool pool = new ForkJoinPool(); MySortTask main = new MySortTask(data , 0 , data.length); // invoke同步方法 主线程停下来,任务运行结束后,主线程运行 pool.invoke(main); for(int i = 0 ; i < 100 ; i++){ System.out.println(data[i]); } } } class MySortTask extends RecursiveAction{ int[] data; int start; int end; /** * 数组在1000个元素以下,不用拆分 */ static final int THRESHOLD = 1000; public MySortTask(int[] data, int start, int end) { super(); this.data = data; this.start = start; this.end = end; } @Override public void compute(){ if (end - start <= THRESHOLD){ Arrays.sort(data , start , end); } else{ int middle = (start+end) / 2; MySortTask task1 = new MySortTask(data, start, middle); MySortTask task2 = new MySortTask(data, middle, end); // 让task1 task2同时执行 invokeAll(task1 , task2); merge(middle); } } /** * description: 将2个有序的数组 合并成有序的大数组 */ void merge(int middle){ int[] a = Arrays.copyOfRange(data, start, middle); int[] b = Arrays.copyOfRange(data, middle, end); int x = 0; int y = 0; for(int i = start ; i < end ; i++){ if (x == a.length) { data[i] = b[y++]; } else if (y == b.length) { data[i] = a[x++]; } else if (a[x] < b[y]) { data[i] = a[x++]; } else { data[i] = b[y++]; } } } }
- 求和
public class TestForkJoin2 extends RecursiveTask<Integer> { private final static int THRESHOLD = 10; private int[] numbers; private int start; private int end; public TestForkJoin2(int[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Integer compute() { if (end - start < THRESHOLD) { System.out.println(" from index = " + start + " , toIndex=" + end); int result = 0; for (int i = start; i <= end; i++) { result += numbers[i]; } return result; } else { int mid = (end + start) / 2; // 拆分任务 TestForkJoin2 task1 = new TestForkJoin2(numbers, start, mid); TestForkJoin2 task2 = new TestForkJoin2(numbers, mid + 1, end); invokeAll(task1, task2); return task1.join() + task2.join(); } } public static void main(String[] args) { int[] numbers = new int[1000]; for (int i = 0; i < numbers.length; i++) { numbers[i] = i + 1; } final ForkJoinPool forkJoinPool = new ForkJoinPool(); long t1 = System.currentTimeMillis(); ForkJoinTask<Integer> task = forkJoinPool.submit(new TestForkJoin2(numbers, 0, numbers.length - 1)); try { Integer sumResult = task.get(); System.out.println(sumResult); } catch (Exception e) { e.printStackTrace(); } long t2 = System.currentTimeMillis(); System.out.println("求和花费的时间: " + (t2 - t1) + " ms"); } }
Disruptor
为了解决高并发下队列锁的问题,能够在无锁的情况下实现队列的并发操作
- ArrayBlockingQueue:基于数组形式的队列,通过加锁方式,保证多线程数据安全
- LinkedBlockingQueue:基于链表形式队列,通过加锁方式,保证多线程数据安全
- ConcurrentLinkedQueue:基于链表形式,CAS协议方式保证多线程数据安全,不加锁主要依赖Unsafe类实现
设计原理
- 环形数组结构:为了避免垃圾回收,采用数组,数组对于处理器的缓存机制更加友好
- 元素位置定位:数组长度2^n,通过位运算,加快定位速度,下表采取递增形式,不用担心inex溢出,index是long类型
- 无锁设计:每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后直接在该位置写入或读取
工作
RingBuffer:Disruptor底层数据结构实现,线程间交换数据中转地
sequence序列号:生产者CAS竞争序列号写如到数组,写一次序列号+1
sequenceBarrier栅栏:生产者没有消费者快,barrier会告诉消费者阻塞,执行等待策略
EventHandler:消费者接口,实现业务逻辑(各自有自己的cursor,自己消费自己的)
Producer:生产者接口,第三方线程充当该角色,向RingBuffer写入事件
- 使用
public class DisruptorTest { public static void main(String[] args) throws InterruptedException { // 生产者 OrderFactory factory = new OrderFactory(); // 大小 int bufferSize = 1024; // 定义 Disruptor<Order> disruptor = new Disruptor<Order>(factory, bufferSize, Executors.newCachedThreadPool(), ProducerType.SINGLE, new YieldingWaitStrategy()); // 连接消费端方法 disruptor.handleEventsWith(new OrderHandler(), new OrderHandler()); // 启动 disruptor.start(); // 拿到实际存储队列 RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); // 生产 for (int i = 0; i < 10; i++) { long sequence = ringBuffer.next(); Order order = ringBuffer.get(sequence); order.setId(i); //提交发布操作 ringBuffer.publish(sequence); System.out.println("生产者发布 id:" + order.getId()); } Thread.sleep(1000); disruptor.shutdown(); } static class Order { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } /** * 工厂,填充队列 */ static class OrderFactory implements EventFactory<Order> { @Override public Order newInstance() { return new Order(); } } /** * 消费者 */ static class OrderHandler implements EventHandler<Order> { @Override public void onEvent(Order order, long l, boolean b) throws Exception { System.out.println(Thread.currentThread().getName() + "消费id:" + order.getId()); } } }