Java基础-JUC

 

一、简介

JUC:JDK5提供了java.util.concurrent(简称JUC)包

在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架

还提供了设计用于多线程上下文中的 Collection 实现等

二、volatile 关键字

多个线程访问内存中的共享数据时,彼此是可见的。

public class TestVolatile {
    public static void main(String[] args) {
        MyThread td = new MyThread();
        new Thread(td).start();
        while (true) {
            if (td.isFlag()) {
                System.out.println("------------------");
                break;
            }
        }
    }
}

class MyThread implements Runnable {
    private boolean flag = false;
    @Override
    public void run() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {}
        flag = true;
        System.out.println("flag=" + isFlag());
    }
    public boolean isFlag() {
        return flag;
    }
    public void setFlag(boolean flag) {
        this.flag = flag;
    }
}

上述代码运行,按照预期应该会打印 true 和横杠,但结果不是这样,它会只打印 true ,而且程序不会终止,进入死循环。这就说明两个线程拿到了不一样的flag值。

这里就要引出一个内存可见性的问题,JVM会为每个线程分配一个独立的缓存,用于提高执行效率。内存可见性确保当一个线程修改了对象状态后,其他线程能够看到变化。

Java内存模型(JMM)规定:
1.线程对共享变量的所有操作都必须在自己的工作内存中进行,不能直接从主内存中读取
2.不同线程之间无法直接访问其他线程工作内存中的变量,线程间变量值的传递需要通过主内存来完成

img

<font color=#009900>volatile关键字实现可见性</font>
1.保证volatile变量的可见性
2.不保证volatile变量复合操作的原子
3.深入来说是通过加入内存屏障和禁止重排序优化来实现的
<font color=#009900>指令重排序</font>:代码写的顺序与实际执行顺序不同,指令重排序是编译器或处理器为了提高程序性能而做的优化
1.编译器优化的重排序(编译器优化)
2.指令级并行重排序(处理器优化)
3.内存系统的重排序(处理器优化)
<font color=#009900>as-if-serial</font>:无论如何重排序,程序执行结果应与代码顺序执行结果一致
Java编译器,运行时和处理器都会保证Java在单线程下遵循as-if-serial语义

//修改实现
private volatile boolean flag = false;

<font color=#009900>synchronized实现可见性</font>
synchronized能够实现原子性(同步)和可见性
Java内存模型(JMM)关于synchronized的规定:
1.线程解锁前,必须把共享变量的最新值刷新到主内存中
2.线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主存中重新读取最新的值(注意:加锁与解锁需要是同一把锁)

//修改实现
synchronized (td){
    if (td.isFlag()) {
        System.out.println("------------------");
        break;
    }
}
//这样实现效率会非常低

参考文章

三、原子变量与CAS算法

原子性:一个操作是不可再分的,要么全部执行成功要么全部执行失败。没有成功一半的说法。

i++问题

int x = 10;
x = x++;
System.out.println(x);

结果会输出10,i++会分为几个操作执行

int temp = i;//读
i = i + 1;//改和写,这里i++操作就执行完了
i = temp;//赋值

多线程访问i++

public class TestAtomicDemo {
    public static void main(String[] args) {
        AtomicDemo ad = new AtomicDemo();
        for (int i = 0; i < 10; i++) {
            new Thread(ad).start();
        }
    }
}

class AtomicDemo implements Runnable {
    private volatile int serialNumber = 0;
    @Override
    public void run() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {}
        System.out.println(getSerialNumber());
    }

    public int getSerialNumber() {
        return serialNumber++;
    }
}

img

可以看到加了volatile关键字多线程访问还是出现了相同值得情况,这就是因为i++操作不具有原子性

synchronized实现原子性

public synchronized int getSerialNumber() {
    return serialNumber++;
}
//这样效率太低,每次只有一个线程可以访问

使用原子变量,在 java.util.concurrent.atomic 包下提供了一些原子变量,可以保证内存可见性和原子性

class AtomicDemo implements Runnable {

    private AtomicInteger serialNumber = new AtomicInteger(0);

    @Override
    public void run() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {}
        System.out.println(getSerialNumber());
    }

    public int getSerialNumber() {
        // 等同于 serialNumber++
        return serialNumber.getAndIncrement();
    }
}

原子变量的实现:CAS (Compare-And-Swap) 算法

是一种硬件对并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问。

CAS 包含了 3 个操作数:
1.需要读写的内存值 V
2.进行比较的值 A
3.拟写入的新值 B
4.当且仅当 V 的值等于 A 时, CAS 通过原子方式用新值 B 来更新 V 的值,否则不会执行任何操作。

//模拟CAS算法
public class TestCompareAndSwap {
    public static void main(String[] args) {
        final CompareAndSwap cas = new CompareAndSwap();
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    int expectedValue = cas.get();
                    boolean b = cas.compareAndSet(expectedValue, (int) (Math.random() * 101));
                    System.out.println(b);
                }
            }).start();
        }
    }
}

class CompareAndSwap {
    private int value;

    //获取内存值
    public synchronized int get() {
        return value;
    }

    //比较
    public synchronized int compareAndSwap(int expectedValue, int newValue) {
        int oldValue = value;
        if (oldValue == expectedValue) {
            this.value = newValue;
        }
        return oldValue;
    }

    //设置
    public synchronized boolean compareAndSet(int expectedValue, int newValue) {
        return expectedValue == compareAndSwap(expectedValue, newValue);
    }
}

四、ConcurrentHashMap的锁分段机制

ConcurrentHashMap 同步容器类是Java 5 增加的一个线程安全的哈希表。对与多线程的操作,介于 HashMap 与 Hashtable 之间。内部采用“锁分段”机制替代 Hashtable 的独占锁。进而提高性能

此包还提供了设计用于多线程上下文中的 Collection 实现:

  • ConcurrentHashMap
  • ConcurrentSkipListMap
  • ConcurrentSkipListSet
  • CopyOnWriteArrayList
  • CopyOnWriteArraySet

当期望许多线程访问一个给定 collection 时, ConcurrentHashMap 通常优于同步的 HashMap,ConcurrentSkipListMap 通常优于同步的 TreeMap。当期望的读数和遍历远远大于列表的更新数时, CopyOnWriteArrayList 优于同步的 ArrayList。

//集合迭代时候进行操作
public class TestCopyOnWriteArrayList {
    public static void main(String[] args) {
        HelloThread ht = new HelloThread();

        for (int i = 0; i < 10; i++) {
            new Thread(ht).start();
        }
    }
}
class HelloThread implements Runnable {
    private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

    static {
        list.add("AA");
        list.add("BB");
        list.add("CC");
    }

    @Override
    public void run() {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            System.out.println(it.next());
            list.add("AA");
        }
    }
}

五、闭锁 CountDownLatch

CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

//计算多个线程执行的时间
public class TestCountDownLatch {

    public static void main(String[] args) {
        //闭锁初始为50
        final CountDownLatch latch = new CountDownLatch(50);
        LatchDemo ld = new LatchDemo(latch);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 50; i++) {
            new Thread(ld).start();
        }
        try {
            //闭锁为0后继续执行
            latch.await();
        } catch (InterruptedException e) {}
        long end = System.currentTimeMillis();
        System.out.println("耗费时间为:" + (end - start));
    }
}

class LatchDemo implements Runnable {

    private CountDownLatch latch;

    public LatchDemo(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 50000; i++) {
                if (i % 2 == 0) {
                    System.out.println(i);
                }
            }
        } finally {
            //每执行完一个线程递减一
            latch.countDown();
        }
    }
}

六、Callable 接口创建线程

1.Callable 接口类似于 Runnable,但 Runnable 不会返回结果,并且无法抛出经过检查的异常
2.Callable 需要依赖FutureTask , FutureTask 也可以用作闭锁

public class TestCallable {

    public static void main(String[] args) {
        ThreadDemo td = new ThreadDemo();
        //执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
        FutureTask<Integer> result = new FutureTask<>(td);
        new Thread(result).start();
        try {
            //接收线程运算后的结果
            Integer sum = result.get();
            //会等到线程执行完毕打印,即 FutureTask 可用于闭锁
            System.out.println(sum);
            System.out.println("------------------------------------");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

class ThreadDemo implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 0; i <= 100000; i++) {
            sum += i;
        }
        return sum;
    }
}

七、Lock 同步锁

显示锁 Lock:

在 Java 5.0 之前,协调共享对象的访问时可以使用的机制只有 synchronized 和 volatile 。 Java 5.0 后增加了一些 新的机制,但并不是一种替代内置锁的方法,而是当内置锁不适用时,作为一种可选择的高级功能。

ReentrantLock 实现了 Lock 接口,并提供了与 synchronized 相同的互斥性和内存可见性。但相较于synchronized 提供了更高的处理锁的灵活性。

//模仿买票流程
public class TestLock {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        new Thread(ticket, "1号窗口").start();
        new Thread(ticket, "2号窗口").start();
        new Thread(ticket, "3号窗口").start();
    }
}

class Ticket implements Runnable {

    private int tick = 100;
    private Lock lock = new ReentrantLock();

    @Override
    public void run() {
        while (true) {
            lock.lock(); //上锁
            try {
                if (tick > 0) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {}
                    System.out.println(Thread.currentThread().getName() + " 完成售票,余票为:" + --tick);
                }
            } finally {
                lock.unlock(); //释放锁,相对于synchronized可能出现死锁
            }
        }
    }
}

八、使用 Lock 同步锁完成唤醒机制

以经典的生产者和消费者为例

用 Object 加 synchronized 实现:

  • wait:告诉当前线程放弃执行权,并放弃监锁入阻塞状态,直到其他线程持并持有了相同的锁并调用notify为止
  • notify:唤醒持有同一个锁中调用wait的第一个线程,被唤醒的线程是进入了可运行状态。等待cpu分配时间片
  • notifyAll:唤醒持有同一锁中调用wait的所有的线程

虚假唤醒问题:

img

public class TestProductorAndConsumer {
    public static void main(String[] args) {
        Clerk clerk = new Clerk();
        Productor pro = new Productor(clerk);
        Consumer cus = new Consumer(clerk);
        new Thread(pro, "生产者 A").start();
        new Thread(cus, "消费者 B").start();
        new Thread(pro, "生产者 C").start();
        new Thread(cus, "消费者 D").start();
    }
}

//店员
class Clerk {
    private int product = 0;
    //进货
    public synchronized void get() {
        while (product >= 1) { // 避免虚假唤醒
            System.out.println("产品已满!");
            try {
                this.wait();
            } catch (InterruptedException e) {}
        }
        System.out.println(Thread.currentThread().getName() + " : " + ++product);
        this.notifyAll();
    }

    //卖货
    public synchronized void sale() {
        while (product <= 0) { // 避免虚假唤醒
            System.out.println("缺货!");
            try {
                this.wait();
            } catch (InterruptedException e) {}
        }
        System.out.println(Thread.currentThread().getName() + " : " + --product);
        this.notifyAll();
    }
}
//生产者
class Productor implements Runnable {
    private Clerk clerk;
    public Productor(Clerk clerk) {
        this.clerk = clerk;
    }
    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {}
            clerk.get();
        }
    }
}
//消费者
class Consumer implements Runnable {
    private Clerk clerk;
    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }
    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            clerk.sale();
        }
    }
}

产生的都是有效数据,不会缺货了还在卖

img

用 Lock 同步锁实现,Condition控制线程通信

Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关联。为了避免兼容性问题, Condition 方法的名称与对应的 Object 版本中的不同

在 Condition 对象中,与 wait、 notify 和 notifyAll 方法对应的分别是await、 signal 和 signalAll

Condition 实例实质上被绑定到一个锁上。要为特定 Lock 实例获得Condition 实例,使用其 newCondition() 方法

public class TestProductorAndConsumerForLock {
    public static void main(String[] args) {
        Clerk clerk = new Clerk();
        Productor pro = new Productor(clerk);
        Consumer con = new Consumer(clerk);
        new Thread(pro, "生产者 A").start();
        new Thread(con, "消费者 B").start();
    }
}

class Clerk {
    private int product = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    // 进货
    public void get() {
        lock.lock();
        try {
            if (product >= 1) { // 为了避免虚假唤醒,应该总是使用在循环中。
                System.out.println("产品已满!");
                try {
                    condition.await();
                } catch (InterruptedException e) {}
            }
            System.out.println(Thread.currentThread().getName() + " : " + ++product);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    // 卖货
    public void sale() {
        lock.lock();
        try {
            if (product <= 0) {
                System.out.println("缺货!");
                try {
                    condition.await();
                } catch (InterruptedException e) {}
            }
            System.out.println(Thread.currentThread().getName() + " : " + --product);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}
// 生产者
class Productor implements Runnable {
    private Clerk clerk;
    public Productor(Clerk clerk) {
        this.clerk = clerk;
    }
    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            clerk.get();
        }
    }
}
// 消费者
class Consumer implements Runnable {
    private Clerk clerk;
    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }
    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            clerk.sale();
        }
    }
}

九、线程按序交替问题

编写一个程序,开启 3 个线程,这三个线程的 ID 分别为A、 B、 C,每个线程将自己的 ID 在屏幕上打印 10 遍,要求输出的结果必须按顺序显示
如: ABCABCABC…… 依次递归

//方式一,一直循环,开销有点大
public class Demo {
    public static void main(String[] args) {
        final String[] x = {"A"};
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    while (true){
                        if (x[0].equals(Thread.currentThread().getName())){
                            System.out.println("第"+ (i+1) +"遍");
                            System.out.println(Thread.currentThread().getName());
                            x[0] = "B";
                            break;
                        }
                    }
                }
            }
        }, "A").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    while (true){
                        if (x[0].equals(Thread.currentThread().getName())){
                            System.out.println(Thread.currentThread().getName());
                            x[0] = "C";
                            break;
                        }
                    }
                }
            }
        }, "B").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    while (true){
                        if (x[0].equals(Thread.currentThread().getName())){
                            System.out.println(Thread.currentThread().getName());
                            x[0] = "A";
                            break;
                        }
                    }
                }
            }
        }, "C").start();
    }
}

img

//方式二
public class Demo2 {
    public static void main(String[] args) {
        final int[] x = {1};
        Lock lock = new ReentrantLock();
        Condition conditiona = lock.newCondition();
        Condition conditionb = lock.newCondition();
        Condition conditionc = lock.newCondition();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    if (x[0] != 1){
                        try {
                            conditiona.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(Thread.currentThread().getName());
                    x[0] = 2;
                    conditionb.signal();
                }
            }
        }, "A").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    if (x[0] != 2){
                        try {
                            conditionb.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(Thread.currentThread().getName());
                    x[0] = 3;
                    conditionc.signal();
                }
            }
        }, "B").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    if (x[0] != 3){
                        try {
                            conditionc.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(Thread.currentThread().getName());
                    x[0] = 1;
                    conditiona.signal();
                }
            }
        }, "C").start();
    }
}

十、ReadWriteLock 读写锁

ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。。

ReadWriteLock 读取操作通常不会改变共享资源,但执行写入操作时,必须独占方式来获取锁。对于读取操作占多数的数据结构。 ReadWriteLock 能提供比独占锁更高的并发性。而对于只读的数据结构,其中包含的不变性可以完全不需要考虑加锁操作

读锁与读锁不互斥,读锁与写锁互斥,写锁与写锁互斥。

public class TestReadWriteLock {

    public static void main(String[] args) {
        ReadWriteLockDemo rw = new ReadWriteLockDemo();
        new Thread(new Runnable() {
            @Override
            public void run() {
                rw.set((int) (Math.random() * 101));
            }
        }, "Write:").start();
        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    rw.get();
                }
            }).start();
        }
    }
}

class ReadWriteLockDemo {
    private int number = 0;
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    //读
    public void get() {
        lock.readLock().lock(); //上锁
        try {
            System.out.println(Thread.currentThread().getName() + " : " + number);
        } finally {
            lock.readLock().unlock(); //释放锁
        }
    }
    //写
    public void set(int number) {
        lock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName());
            this.number = number;
        } finally {
            lock.writeLock().unlock();
        }
    }
}

十一、线程八锁

使用线程常见的八种情况

1.两个普通同步方法,两个线程,打印 //one two

public class TestThread1Monitor {
    public static void main(String[] args) {
        Number number = new Number();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getOne();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getTwo();
            }
        }).start();
    }
}

class Number {
    public synchronized void getOne() {
        System.out.println("one");
    }
    public synchronized void getTwo() {
        System.out.println("two");
    }
}

2.新增 Thread.sleep() 给 getOne(),打印 //one two

public synchronized void getOne() {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("one");
}

3.新增普通方法 getThree() , 打印 //three one two

public class TestThread3Monitor {
    public static void main(String[] args) {
        Number number = new Number();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getOne();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getTwo();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getThree();
            }
        }).start();
    }
}

class Number {
    public synchronized void getOne() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("one");
    }
    public synchronized void getTwo() {
        System.out.println("two");
    }
    public void getThree() {
        System.out.println("three");
    }
}

4.两个普通同步方法,两个 Number 对象,打印 //two one

public class TestThread4Monitor {
    public static void main(String[] args) {
        Number number = new Number();
        Number number2 = new Number();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getOne();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number2.getTwo();
            }
        }).start();
    }
}

class Number {
    public synchronized void getOne() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("one");
    }
    public synchronized void getTwo() {
        System.out.println("two");
    }
}

5.修改 getOne() 为静态同步方法,打印 //two one

public class TestThread8Monitor {
    public static void main(String[] args) {
        Number number = new Number();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getOne();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getTwo();
            }
        }).start();
    }
}
class Number {
    public static synchronized void getOne() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("one");
    }
    public synchronized void getTwo() {
        System.out.println("two");
    }
}

6.修改两个方法均为静态同步方法,一个 Number 对象 //one two

public static synchronized void getTwo() {
        System.out.println("two");
    }

7.一个静态同步方法,一个非静态同步方法,两个 Number 对象 //two one

public class TestThread7Monitor {
    public static void main(String[] args) {
        Number number = new Number();
        Number number2 = new Number();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getOne();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number2.getTwo();
            }
        }).start();
    }
}
class Number {
    public static synchronized void getOne() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("one");
    }
    public synchronized void getTwo() {
        System.out.println("two");
    }
}

8.两个静态同步方法,两个 Number 对象 //one two

public class TestThread8Monitor {
    public static void main(String[] args) {
        Number number = new Number();
        Number number2 = new Number();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getOne();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                number2.getTwo();
            }
        }).start();
    }
}
class Number {
    public static synchronized void getOne() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("one");
    }
    public static synchronized void getTwo() {
        System.out.println("two");
    }
}

总结:

①非静态方法的锁默认为 this, 静态方法的锁为对应的 Class 实例
②某一个时刻内,只能有一个线程持有锁,无论几个方法

十一、线程池

线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。

  • ExecutorService newFixedThreadPool() : 创建固定大小的线程池
  • ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量
  • ExecutorService newSingleThreadExecutor() : 创建单个线程池。线程池中只有一个线程
  • ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务
public class TestThreadPool {
    public static void main(String[] args) throws Exception {
        //创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(5);
        List<Future<Integer>> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            //提交任务到线程池
            Future<Integer> future = pool.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum = 0;
                    for (int i = 0; i <= 100; i++) {
                        sum += i;
                    }
                    return sum;
                }
            });
            list.add(future);
        }
        //关闭线程池
        pool.shutdown();
        //查看结果
        for (Future<Integer> future : list) {
            System.out.println(future.get());
        }
}

class ThreadPoolDemo implements Runnable {
    private int i = 0;
    @Override
    public void run() {
        while (i <= 100) {
            System.out.println(Thread.currentThread().getName() + " : " + i++);
        }
    }
}

线程调度 ScheduledExecutorService

一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令

public class TestScheduledThreadPool {
    public static void main(String[] args) throws Exception {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 5; i++) {
            Future<Integer> result = pool.schedule(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int num = new Random().nextInt(100);//生成随机数
                    System.out.println(Thread.currentThread().getName() + " : " + num);
                    return num;
                }
            }, 1, TimeUnit.SECONDS);//一秒后执行
            System.out.println(result.get());
        }
        pool.shutdown();
    }
}

十二、ForkJoinPool 分支/合并框架 工作窃取

Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总

Divide-and-Conquer

img

//计算1加到100
public class Demo3 {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> js = new Js(1L,100L);
        Long restul = pool.invoke(js);
        System.out.println(restul);
    }
}
class Js extends RecursiveTask<Long>{
    private Long a;
    private Long b;
    public Js(Long a, Long b) {
        this.a = a;
        this.b = b;
    }
    @Override
    protected Long compute() {
        if (b-a < 10) {
            long sum = 0;
            for (Long i = a; i <= b; i++) {
                sum += i;
            }
            return sum;
        } else {
            Js left = new Js(a, (a+b)/2);
            left.fork(); //进行拆分,同时压入线程队列
            Js right = new Js((a+b)/2+1, b);
            right.fork(); //进行拆分,同时压入线程队列
            return left.join() + right.join();
        }
    }
}

Fork/Join 框架与线程池的区别:

采用 “工作窃取”模式(work-stealing):相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能。

Demo地址