Java并发


Java并发

CPU

现代CPU为了提升执行效率,为了减少CPU与内存交互,在CPU上做了多级缓存,一般是三级缓存

  • 一级缓存:数据缓存和指令缓存,逻辑核独占
  • 二级缓存:物理核独占,一个物理核心一个L1和L2,逻辑核共享
  • 三级缓存:所有物理核共享
    CPU寄存器 > L1 > L2 > L3 > 内存
    缓存由最小的存储区块——缓存行(缓存由N个缓存行组成)组成,缓存行一般64字节
    CPU读的时候依次从L1 L2 L3 内存去找,在内存找到会依次给L3 L2 L1都拷贝一份
    空间局部性原则:去内存拿数据时,会把目标相邻的数据也读到缓存
    时间局部性原则:一个数据正在被访问,那么近期很可能会被再次访问,缓存不会丢弃
    线程上下文切换:线程切换,保存上一个线程的信息,加载下一个线程信息,数据保存在内存Task State Segment(任务状态段)
    运行状态切换:内核态,用户态
    内存用户空间(保护系统):JVM,软件,360
    内存内核空间:OS

CPU运行安全级别

ring0:最高级别,内核态,可以执行很多操作,刷盘,很敏感
ring1
ring2
ring3:用户态
Linux和Windows只用了ring0,ring3两个级别
Java里面线程创建调度和管理由内核完成,内核保存线程的状态和上下文信息。

volatile

Java虚拟机轻量级同步机制

  • 保证被volatile修饰的变量对所有线程可见,一个线程修改了共享变量,会及时通知其它线程。(非volatile共享变量在不同线程中修改值,其它线程早晚也会知道,只是不能及时知道变量变化了)
  • 每个线程会在自己线程独有保存一个共享变量的值,一个线程修改了共享变量值之后,会通知其它线程值已修改,其它线程丢弃自己的脏数据,或者覆盖脏数据,然后读取新值
  • 禁止指令重排,JVM会根据代码逻辑进行智能优化,重排代码执行顺序。增加性能,高并发下可能出问题

Monitor监视器锁

一种同步机制,一个对象。Java对象天生带了Monitor锁,也就是synchronized的对象锁,这些状态都放在对象头Mark Word
任何对象都有一个Monitor与之关联,当一个Monitor被持有后,它就处于锁定状态。依赖OS,-> Mutex互斥量 -> 操作系统维护(要切换内核态)
查看对象内存布局

<dependency>
	<groupId>org.openjdk.jol</groupId>
	<artifactId>jol-core</artifactId>
	<version>0.10</version>
</dependency>

Object o = new Object();
System.out.println(ClassLayout.parseInstance(o).toPrintable());

synchronized

同步锁,内置锁,隐式锁,保证原子性,可重入
基于进入和退出Monitor对象来实现同步
锁膨胀:JDK1.6后会自适应升级,无锁,偏向锁,轻量级锁,重量级锁
锁消除:JVM检测同步代码不可能存在锁竞争的情况就会清除同步锁。实现基于逃逸分析,比如只在一个方法内执行,不会逃逸出这个方法,那么就不会被其它线程使用。
锁的粗化:将多个连续的加锁,解锁操作 优化成一个大的同步锁
自旋锁:不断循环,尝试获取锁,可以避免一些线程的挂起和恢复(这个过程要用户态转入内核态,比较慢)

  1. 内核态:CPU可以访问内存所有数据,包括外围设备,例如硬盘,网卡。CPU也可以将自己从一个程序切换到另一个程序
  2. 用户态:只能受限的访问内存,且不允许访问外围设备,占用CPU的能力被剥夺,CPU资源可以被其他程序获取。
    这样区分是为了防止用户进程获取别的程序的内存数据。
    Synchronized依赖操作系统实现,使用同步锁需要进行用户态到内核态的切换。简单来说,在JVM中monitor enter和monitor exit字节码是依赖于底层操作系统的Mutex Lock来实现,但是由于使用Mutex Lock需要将当前线程挂起并从用户态切换到内核态来执行,这种切换的代价是非常昂贵的。

AQS和ReentrantLock

AQS 的全称 AbstractQueuedSynchronizers抽象队列同步器
在Java并发包(java.util.concurrent)中很多锁都是通过AQS来实现加锁和释放锁的过程
ReentrantLock是AQS框架的应用实现,线程同步的手段,具有比synchronized更多特性,支持手动加锁,解锁,支持加锁公平性,可重入

ReentrantLock lock = new ReentrantLock(false); // true公平锁 false非公平锁
lock.lock()
lock.unlock()

加锁,释放锁逻辑

lock.lock() //加锁
while(true){
    if(cas加锁成功){
        break;
    }
    HashSet,LikedQueued(),
    HashSet.add(Thread)
    LikedQueued.put(Thread)
    LockSupport.park(); //阻塞
}
// 拿到锁后执行业务逻辑
 
lock.unlock() //解锁
Thread  t= HashSet.get()
Thread  t = LikedQueued.take();
LockSupport.unpark(t);

核心实现:自旋,LocksSuport, CAS(加锁,不管有多少线程来,永远要保证只有1个线程能加锁成功)
使用queue队列,原因:要想实现公平和非公平,需要排队,FIFO。就保证了公平性。

  • 公平锁:竞争锁需要排队,当可以获取锁时还需判断队列有没有线程排队,没线程就CAS修改state
  • 非公平锁:当可以获取锁时就竞争锁(CAS修改state)

AQS包含三个组件

exclusiveOwnerThread 当前获取锁的线程是谁
state 状态器(为0就没有线程持有,无锁状态)
同步等待队列(CHL队列):基于双向链表数据结构的队列FIFO
  • 锁竞争:state=0,没人就CAS;state=1,有人就判断当前获取锁的线程是不是自己,是就state+1,不是自己就加锁失败

Semaphore

信号量,控制访问特定资源线程数目,底层依赖AQS的State
可以做服务限流

public static void main(String[] args) {
       Semaphore semaphore = new Semaphore(2); // 10个线程一次允许2个允许
       for (int i = 0;i < 10;i++) {
           new Thread(new MyTask(semaphore, "t" + i)).start();
       }
   }

   static class MyTask extends Thread{
       Semaphore semaphore;

       public MyTask(Semaphore semaphore, String tName) {
           super(tName);
           this.semaphore = semaphore;
       }

       @Override
       public void run() {
           try {
               // semaphore.acquire(); // 获取公共资源
               if (semaphore.tryAcquire(500, TimeUnit.MILLISECONDS)) { // 如果拿到了执行下面的,拿不到最多等待500ms
                   System.out.println(Thread.currentThread().getName() + "run");
                   Thread.sleep(2000); // do something
                   semaphore.release(); // 释放公共资源
               } else {
                   System.out.println("降级"); // 降级
               }


           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
Semaphore semaphore = new Semaphore(2); // state初始值2,并行度2,总容量池
semaphore.acquire();                    // 每个线程获取1个凭据,state-1(这里可以带参n,获取n个凭据,返还也返还n个)
semaphore.release();                    // 凭据归还,state+1
semaphore.tryAcquire(500, TimeUnit.MILLISECONDS) // 尝试获取,拿不到最多等待500ms
new Semaphore(int permits, boolean fair); // 构造方法,permits许可线程数量 fair公平性,true:下次执行的线程是等待最久的线程
如果凭证不足够就会进入CLH队列然后阻塞
其它线程释放凭证之后会唤醒CLH队列中的第一个线程
唤醒的线程判断节点是不是头节点,如果是head就会不断尝试,是否有足够凭证,如果有凭证自己就会往下执行逻辑(逻辑执行完后就释放),然后往CHL队列后面传播
不是头节点就会改变信号量状态,改为唤醒状态,然后park阻塞

CountDownLaunch

它能够使一个线程等待其它线程完成各自工作后再往下执行
CountDownLaunch通过计数器实现,计数器初始值为线程数量,一个线程完成计数器-1,计数器=0说明所有线程完成工作

  • main等待多个线程执行完成后再往下执行

    // 计数
          CountDownLatch countDownLatch = new CountDownLatch(2);
    
          new Thread(() -> {
              System.out.println(Thread.currentThread().getName()+" 完成");
              // 一个线程执行完之后释放 计数-1
              countDownLatch.countDown();
          },"写代码").start();
    
          new Thread(() -> {
              System.out.println(Thread.currentThread().getName()+" 完成");
              // 一个线程执行完之后释放 计数-1
              countDownLatch.countDown();
          },"洗漱").start();
    
          countDownLatch.await(); //等待所有任务完成才往下走
          System.out.println(Thread.currentThread().getName()+" 睡觉");
  • 反过来使用,让多个线程在同一时刻起跑

    // 计数
          CountDownLatch countDownLatch = new CountDownLatch(1);
          for (int i=0;i<10;i++) {
              new Thread(() -> {
                  try {
                      countDownLatch.await();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(Thread.currentThread().getName() + "完成");
              },"线程" + i).start();
          }
          // 睡眠3S后让10个线程同时跑(这样线程都是创建完毕的,然后统一跑)
          Thread.sleep(3000);
          countDownLatch.countDown();

CyclicBarrier

线程栅栏可以让一组线程等待至某个状态(屏障点)之后再全部同时执行。他和countdownlaunch不同可以重复使用

public static void main(String[] args) throws InterruptedException {
       CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() {
           @Override
           public void run() {
               // 当所有线程达到屏障就执行
               System.out.println("线程就位,发车");
           }
       });
       for (int i=0;i<10;i++) {
           new MyThread(cyclicBarrier).start();
       }

       // 反复使用
       Thread.sleep(2000);
       for (int i=0;i<10;i++) {
           new MyThread(cyclicBarrier).start();
       }
   }

   static class MyThread extends Thread {
       private CyclicBarrier cyclicBarrier;

       MyThread(CyclicBarrier cyclicBarrier) {
           this.cyclicBarrier = cyclicBarrier;
       }

       @Override
       public void run() {
           try {
               System.out.println(Thread.currentThread().getName() + "执行");
               // 告诉CyclicBarrier我已经到达了屏障
               cyclicBarrier.await();
           } catch (InterruptedException e) {
               e.printStackTrace();
           } catch (BrokenBarrierException e) {
               e.printStackTrace();
           }
       }
   }

atomic

保证原子性,Atomic包12个类,4种原子更新方式,实现大多使用Unsafe
用的无锁算法CAS
底层基于魔术类Unsafe提供的CAS-API完成

  • CAS三大API
    compareAndSwapObject
    compareAndSwapInt
    compareAndSwapLong
    这三个基于硬件原语——CMPXCHG实现原子操作CAS
    atomic使用
    public static int total;
       static AtomicInteger atomicInteger = new AtomicInteger(0);
    
       public static void main(String[] args) throws InterruptedException {
           CountDownLatch countDownLatch = new CountDownLatch(10);
    
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   for (int j = 0; j < 10000; j++) {
                       // total++;
                       atomicInteger.getAndIncrement();
                   }
                   countDownLatch.countDown();
               }, "线程" + i).start();
           }
           // 等待10线程执行完
           countDownLatch.await();
           System.out.println(atomicInteger.get());
       }
  • AtomicInteger底层
    do {
    	oldValue = this.getIntVolatile(var1, var2); // 读取AtomicInteger的value值
    	// valueOffset:value属性在对象内存当中的偏移量
    	// 想要CAS修改某个对象的值,需要知道属性在对象空间的哪个位置,必须知道偏移量
    } while(!this.compareAndSwapInt(AtomicInteger, valueOffset, oldValue, oldValue + 1));
    return var5;
  • 原子修改数组
    static int[] arr = new int[]{1,2};
       static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(arr);
    // 原子修改
    atomicIntegerArray.getAndSet(0, 100);
    // atomicArray会修改
       System.out.println(atomicIntegerArray.get(0));
    // 但是数组本身不会修改
       System.out.println(arr[0]);
  • 原子修改自定义对象
    static Student[] students = new Student[]{new Student(1, "bob")};
       static AtomicReferenceArray objectArray = new AtomicReferenceArray(students);
    System.out.println("------- 自定义对象 ------");
       objectArray.set(0, new Student(2, "tom"));
       System.out.println(objectArray.get(0));
  • 原子修改对象属性
    // 这里需要修改的属性name必须是public volatile
    static AtomicReferenceFieldUpdater fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
    System.out.println("------ 自定义对象属性 ------");
       Student student = new Student(3, "alice");
       fieldUpdater.set(student, "alice2");
       System.out.println(student);

CAS ABA

ABA问题:线程1查看A值100,线程2修改A值 +1 -1,然后线程1再次查看A值100。线程1认为A值并未发生过改变

static AtomicInteger atomicInteger = new AtomicInteger(100);

   public static void main(String[] args) {
       Thread thread1 = new Thread(() -> {
           int a = atomicInteger.get();
           System.out.println("线程1获取a值:" + a);
           try {
               Thread.sleep(3000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           boolean isCasSuccess = atomicInteger.compareAndSet(100, 200);
           if (isCasSuccess) {
               System.out.println("CAS成功,A:" + atomicInteger.get());
           } else {
               System.out.println("CAS失败");
           }
       }, "线程1");

       Thread thread2 = new Thread(() -> {
          atomicInteger.getAndIncrement();
           System.out.println("线程2修改A:" + atomicInteger.get());
          atomicInteger.decrementAndGet();
           System.out.println("线程2修改A:" + atomicInteger.get());
       }, "线程2");

       thread1.start();
       thread2.start();
   }
// 最终CAS会成功
  • 利用版本号解决ABA问题,CAS不仅判断数据是否一样,还判断版本是否一样
    /**
        * 100初始值, 0版本号
        */
       static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 0);
    
       public static void main(String[] args) {
           Thread thread1 = new Thread(() -> {
               int stamp = atomicStampedReference.getStamp();
               int a = atomicStampedReference.getReference();
               System.out.println("线程1获取a值:" + a + " 版本号:" + stamp);
               try {
                   Thread.sleep(3000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               boolean isCasSuccess = atomicStampedReference.compareAndSet(a, a + 100, stamp, stamp + 1);
               if (isCasSuccess) {
                   System.out.println("CAS成功,A:" + atomicStampedReference.getReference() + " 版本号:" + atomicStampedReference.getStamp());
               } else {
                   System.out.println("CAS失败");
               }
           }, "线程1");
    
           Thread thread2 = new Thread(() -> {
               // 第一次修改
               int stamp = atomicStampedReference.getStamp();
               int a = atomicStampedReference.getReference();
               atomicStampedReference.compareAndSet(a, a + 1, stamp, stamp + 1);
               System.out.println("线程2修改A:" + atomicStampedReference.getReference() + " 版本号:" + atomicStampedReference.getStamp());
               // 第二次修改
               stamp = atomicStampedReference.getStamp();
               a = atomicStampedReference.getReference();
               atomicStampedReference.compareAndSet(a, a - 1, stamp, stamp + 1);
               System.out.println("线程2修改A:" + atomicStampedReference.getReference() + " 版本号:" + atomicStampedReference.getStamp());
           }, "线程2");
    
           thread1.start();
           thread2.start();
       }
    // CAS失败

Unsafe类

魔术类是sun.misc包下,提供用于执行低级别,不安全操作的方法。如直接访问系统内存资源,自主管理内存资源等。这些方法在提升Java允许效率,增强java底层资源能力起到很大作用
由于Unsafe拥有类似C指针一样操作内存空间的能力,增加了程序发生指针问题的风险。使用一定要慎重
Unsafe为单例类,提供getUnsafe获取单例,只有调用的类为引导类加载器所加载才合法

  • 内存管理
    文件大量并发上传,堆可能经常满,GC
    可以用Unsafe申请堆外内存,推外内存不属于GC管理,用完一定手动释放,否则内存泄露
    public class UnsafeManageMemory {
        private Unsafe unsafe;
        private long address;
        // 获取单例
        public static Unsafe getUnsafe() throws IllegalAccessException, NoSuchFieldException {
            Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            //Field unsafeField = Unsafe.class.getDeclaredFields()[0]; //也可以这样,作用相同
            unsafeField.setAccessible(true);
            return (Unsafe) unsafeField.get(null);
        }
    
        void doSomething() throws NoSuchFieldException, IllegalAccessException {
            long file = 65536L;
            byte size = 8;
    		// 获取单例
            unsafe = getUnsafe();
            // 分配内存
            address = unsafe.allocateMemory(size);
            System.out.println("address:" + address);
            // 写入内存
            unsafe.putAddress(address, file);
            // 读内存
            long readValue = unsafe.getAddress(address);
            System.out.println("read:" + readValue);
        }
    
        public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
            UnsafeManageMemory unsafeManageMemory = new UnsafeManageMemory();
            unsafeManageMemory.doSomething();
        }
    
        @Override
        protected void finalize() throws Throwable
        {
            super.finalize();
    		// 释放内存
            unsafe.freeMemory(address);
        }
    }
  • 其它
    unsafe.monitorEnter(object); // 加锁
    unsafe.monitorExit(object);  // 解锁
    unsafe.park(true, 0L);       // 阻塞true:ms定时,false:ns定时
    unsafe.unpark(thread);       // 唤醒

线程

线程是调度CPU资源最小单位,线程模型分为KLT ULT,JVM使用KLT:Java线程与OS线程保存1:1,一个Java线程对应在操作系统一个线程
状态state:

  • NEW 新建

  • RUNNABLE 运行

  • BLOCKED 阻塞

  • WAITING 等待

  • TIMED_WAITING 超时等待

  • TERMINATED 终结

  • 协程:纤程,用户线程,为了追求最大力度发挥硬件性能和提升软件速度,在某个点挂起当前任务,保留栈信息,然后执行另外的任务。整个过程不需要上下文切换

  • Java原生不支持协程,需要引入三方包,quasar

线程池

线程池就是线程缓存,对线程进行统一分配,调优,监控
线程是重要稀缺资源,线程池能够重用线程,减少线程创建,消除的开销
线程数量设置:
CPU密集型:CPU核数+1
IO密集型:2CPU核数+1
其它:CPU核数 * [1 + (I/O耗时 / CPU耗时)]
线程池适用:

  • 单个任务处理时间短
  • 处理任务数量大
    参数
    int corePoolSize          // 核心线程数
    int maximumPoolSize       // 最大线程数
    long keepAliveTime        // 最大允许不干活时间
    TimeUnit unit             // keepAliveTime时间单位
    BlockingQueue<Runnable> workQueue // 存放未来得及执行的任务
    ThreadFactory threadFactory       // 创建线程的工厂
    RejectedExecutionHandler handler  // 拒绝策略
    线程池5种状态:RUNNING:接收新任务,处理。SHUTDOWN:不接收新任务,只处理存在任务。STOP不接收,不处理。TIDYING:所有任务终止。TERMINATED:线程池彻底终止
    threadPool.shutdown();   // running -> shutdown
    threadPool.shutdownNow() // running -> stop
  • 一开始创建线程池没有一个线程,任务来了再创建线程,来一个创建一个线程(不会复用线程,直到创建完核心线程数个线程,之后再复用线程)
  • 任务来了,核心线程没满就创建线程去执行任务,满了就放到阻塞队列(核心线程执行完任务去阻塞队列拿新任务)。阻塞队列满了就往非核心线程放,所有线程满了就走拒绝策略。
  • 拒绝策略
    AbortPolicy:抛出RejectedExecutionException异常(默认)
    DiscardPolicy:也是丢弃任务,但是不抛出异常。(啥也不干)
    DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    CallerRunsPolicy:由调用线程处理该任务
    拒绝策略可以重写,比如可以把任务序列号放到redis,然后拿回来放到线程池执行

    非核心线程超时后(线程数>核心线程数的时候)(线程不分核心,非核心,最后剩余的就是核心的线程,超出的就是非核心),会释放掉
    其中allowCoreThreadTimeOut参数设置为true的情况(默认false),核心线程超时也会释放(线程超时就释放)
    线程超时:线程跑到阻塞队列拿任务,没拿到,阻塞队列为空然后再过keepAliveTime时长后就超时

定时线程池

ScheduleThreadPoolExecutor 定时线程池:处理延时任务or定时任务
继承ThreadPoolExecutor,没有非核心线程的说法
抛出异常会干掉异常的任务

  • 延迟执行
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
           ScheduledFuture<Integer> scheduledFuture =  scheduledThreadPoolExecutor.schedule(() -> {
               System.out.println("延迟执行");
               return 1;
           }, 2000, TimeUnit.MILLISECONDS);
           System.out.println("do something");
           // 等待任务执行完
           System.out.println(scheduledFuture.get());
           System.out.println("finally");
  • 定时执行
    static Logger scheduleLog = Logger.getLogger("scheduleLog");
    // 发心跳,定时2S执行
          ScheduledThreadPoolExecutor scheduledThreadPoolExecutor1 = new ScheduledThreadPoolExecutor(1);
          scheduledThreadPoolExecutor1.scheduleAtFixedRate(() -> {
              scheduleLog.info("heart beat");
              try {
                  // 线程执行直接大于定时间隔时间,会间隔5秒执行一次
                  Thread.sleep(5000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }, 1000, 2000, TimeUnit.MILLISECONDS);
  • 推迟执行
    static Logger scheduleLog = Logger.getLogger("scheduleLog");
    // 比上一次执行推迟2S执行
          ScheduledThreadPoolExecutor scheduledThreadPoolExecutor2 = new ScheduledThreadPoolExecutor(1);
          scheduledThreadPoolExecutor2.scheduleWithFixedDelay(() -> {
              scheduleLog.info("推迟heart beat");
              try {
                  Thread.sleep(5000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }, 1000, 2000, TimeUnit.MILLISECONDS);
  • Timer
    // Timer不推荐使用,抛出异常会把线程干掉,接下来就无法执行新来的task
    Timer timer = new Timer();
          timer.scheduleAtFixedRate(new TimerTask() {
              @Override
              public void run() {
                  System.out.println("timer定时");
              }
          }, 1000, 2000);

底层

底层采用DelayQueue存储等待任务
DelayQueue内部封装PriorityQueue,根据time先后顺序排序,给任务排序
任务来了直接放到队列,不是周期任务就执行一次(suepr.run),周期任务设置下次执行时间,然后执行自己
DelayQueue -> 堆结构 -> 数组
最小堆结构,最小的放到最上面,左右节点无序

常用定时任务

  • Timer:单线程,线程挂了,不会再创建线程执行任务
  • ScheduleThreadPoolExecutor:线程挂了,任务丢弃,但是再提交任务可以继续执行
  • xxl-job:定时任务 + 分布式调度
  • quartz:单机的定时任务,功能强大

Fork/Join

并行执行任务框架,把大任务分割成若干小任务
适合计算类型

Future

线程的执行结果

Future<String> future =  threadPoolExecutor.submit(new Callable<String>() {
          @Override
          public String call() throws Exception {
		// restTemplate.getForObject...
              Thread.sleep(2000);
              return "test";
          }
      });
// 等待任务执行完,在这里park,结果出来了再唤醒
      Object o = future.get(2, TimeUnit.SECONDS);
      System.out.println(o);

工作

每个线程有各自对应的一个队列(双端队列),但是线程有可能窃取其它队列的任务执行
任务从队尾进入,线程获取自己队列从队尾拿,窃取其它队列任务从队头拿

构造参数

  • int parallelism:并行度,默认和CPU个数一致
  • ForkJoinWorkerThreadFactory factory:创建工作线程的工厂
  • UncaughtExceptionHandler handler:线程异常处理器
  • boolean asyncMode:队列中的任务调度方式,默认LIFO,可以设置为true(FIFO)

使用

一般只需使用ForkJoin提供的子类
RecursiveAction:用于没有返回结果的任务,这个会把自己工作分割成更小的几块,这样它们可以由独立的线程执行
RecursiveTask:用于有返回结果的任务,将工作分割若干更小任务,将这些子任务执行合并到一个集合里面
CountedCompleter:任务完成执行后会触发执行一个自定义的钩子函数

  • 数组排序
    public class TestForkJoinSort {
    
        public static void main(String[] args) {
            Random random = new Random();
            int[] data = new int[5000000];
            for(int i = 0 ; i < data.length ; i++){
                data[i] = random.nextInt(5000000);
            }
    
            ForkJoinPool pool = new ForkJoinPool();
            MySortTask main = new MySortTask(data , 0 , data.length);
            // invoke同步方法  主线程停下来,任务运行结束后,主线程运行
            pool.invoke(main);
            for(int i = 0 ; i < 100 ; i++){
                System.out.println(data[i]);
            }
        }
    
    }
    
    class MySortTask extends RecursiveAction{
        int[] data;
        int start;
        int end;
        /**
         * 数组在1000个元素以下,不用拆分
         */
        static final int THRESHOLD = 1000;
    
        public MySortTask(int[] data, int start, int end) {
            super();
            this.data = data;
            this.start = start;
            this.end = end;
        }
    
        @Override
        public void compute(){
            if (end - start <= THRESHOLD){
                Arrays.sort(data , start , end);
            }
            else{
                int middle = (start+end) / 2;
                MySortTask task1 = new MySortTask(data, start, middle);
                MySortTask task2 = new MySortTask(data, middle, end);
    
                // 让task1 task2同时执行
                invokeAll(task1 , task2);
                merge(middle);
            }
        }
    
        /**
         * description: 将2个有序的数组 合并成有序的大数组
         */
        void merge(int middle){
            int[] a = Arrays.copyOfRange(data, start, middle);
            int[] b = Arrays.copyOfRange(data, middle, end);
    
            int x = 0;
            int y = 0;
            for(int i = start ; i < end ; i++){
                if (x == a.length) {
                    data[i] = b[y++];
                } else if (y == b.length) {
                    data[i] = a[x++];
                } else if (a[x] < b[y]) {
                    data[i] = a[x++];
                } else {
                    data[i] = b[y++];
                }
            }
        }
    }
  • 求和
    public class TestForkJoin2 extends RecursiveTask<Integer> {
    
        private final static int THRESHOLD = 10;
        private int[] numbers;
        private int start;
        private int end;
    
        public TestForkJoin2(int[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
            if (end - start < THRESHOLD) {
                System.out.println(" from index = " + start + " , toIndex=" + end);
                int result = 0;
                for (int i = start; i <= end; i++) {
                    result += numbers[i];
                }
                return result;
            } else {
                int mid = (end + start) / 2;
                // 拆分任务
                TestForkJoin2 task1 = new TestForkJoin2(numbers, start, mid);
                TestForkJoin2 task2 = new TestForkJoin2(numbers, mid + 1, end);
                invokeAll(task1, task2);
                return task1.join() + task2.join();
            }
        }
    
        public static void main(String[] args) {
            int[] numbers = new int[1000];
            for (int i = 0; i < numbers.length; i++) {
                numbers[i] = i + 1;
            }
            final ForkJoinPool forkJoinPool = new ForkJoinPool();
            long t1 = System.currentTimeMillis();
            ForkJoinTask<Integer> task = forkJoinPool.submit(new TestForkJoin2(numbers, 0, numbers.length - 1));
            try {
                Integer sumResult = task.get();
                System.out.println(sumResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
            long t2 = System.currentTimeMillis();
            System.out.println("求和花费的时间: " + (t2 - t1) + " ms");
        }
    
    }

Disruptor

为了解决高并发下队列锁的问题,能够在无锁的情况下实现队列的并发操作

  • ArrayBlockingQueue:基于数组形式的队列,通过加锁方式,保证多线程数据安全
  • LinkedBlockingQueue:基于链表形式队列,通过加锁方式,保证多线程数据安全
  • ConcurrentLinkedQueue:基于链表形式,CAS协议方式保证多线程数据安全,不加锁主要依赖Unsafe类实现

设计原理

  • 环形数组结构:为了避免垃圾回收,采用数组,数组对于处理器的缓存机制更加友好
  • 元素位置定位:数组长度2^n,通过位运算,加快定位速度,下表采取递增形式,不用担心inex溢出,index是long类型
  • 无锁设计:每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后直接在该位置写入或读取

工作


RingBuffer:Disruptor底层数据结构实现,线程间交换数据中转地
sequence序列号:生产者CAS竞争序列号写如到数组,写一次序列号+1
sequenceBarrier栅栏:生产者没有消费者快,barrier会告诉消费者阻塞,执行等待策略
EventHandler:消费者接口,实现业务逻辑(各自有自己的cursor,自己消费自己的)
Producer:生产者接口,第三方线程充当该角色,向RingBuffer写入事件

  • 使用
    public class DisruptorTest {
    
        public static void main(String[] args) throws InterruptedException {
            // 生产者
            OrderFactory factory = new OrderFactory();
            // 大小
            int bufferSize = 1024;
            // 定义
            Disruptor<Order> disruptor =
                    new Disruptor<Order>(factory, bufferSize, Executors.newCachedThreadPool(), ProducerType.SINGLE, new YieldingWaitStrategy());
            // 连接消费端方法
            disruptor.handleEventsWith(new OrderHandler(), new OrderHandler());
            // 启动
            disruptor.start();
            // 拿到实际存储队列
            RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
            // 生产
            for (int i = 0; i < 10; i++) {
                long sequence = ringBuffer.next();
                Order order = ringBuffer.get(sequence);
                order.setId(i);
                //提交发布操作
                ringBuffer.publish(sequence);
                System.out.println("生产者发布 id:" + order.getId());
            }
            Thread.sleep(1000);
            disruptor.shutdown();
        }
    
        static class Order {
            private int id;
            private String name;
    
            public int getId() {
                return id;
            }
    
            public void setId(int id) {
                this.id = id;
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
        }
    
        /**
         * 工厂,填充队列
         */
        static class OrderFactory implements EventFactory<Order> {
            @Override
            public Order newInstance() {
                return new Order();
            }
        }
    
    
        /**
         * 消费者
         */
        static class OrderHandler implements EventHandler<Order> {
            @Override
            public void onEvent(Order order, long l, boolean b) throws Exception {
                System.out.println(Thread.currentThread().getName() + "消费id:" + order.getId());
            }
        }
    }
    

  目录