Java基础(八)—— 线程与异步

本文最后更新于:2021年9月29日 凌晨

概览:Java多线程与异步

Java 多线程 并发

并发&并行

  • 并发:逻辑上多个线程一起运行,实际上某个时刻只有一个线程在执行的现象称为并发。
  • 如果CPU支持超线程技术(也就是一个核心可以同时运行多个线程)或者CPU有多个核心,那么多线程就会并行执行。

Thread类和Runnable接口

1
2
3
4
5
6
7
8
9
10
11
class Thread implements Runnable{

private Runnable target;

@Override
public void run() {
if (target != null) {
target.run();
}
}
}

Thread类实现了Runnable接口,内部有Runnable的指向的对象target。

接口不可以实例化,但是接口变量可以指向该接口实现类的对象;

然后实现了Runnable接口的实现类,需要创建实例对象传递给Thread()的构造方法中,再启动线程。

1
2
3
4
5
6
7
8
9
10
11
public class Thread1 implements Runnable{
public static void main(String[] args) {
System.out.println(Thread.currentThread().getName() + "正在运行");
new Thread(new Thread1()).start();
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "子线程 正在运行");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 匿名内部类
Thread thread = new Thread(new Runnable(){
@override
public void run(){
for(int i=0;i<10;i++){
System.out.println(i);
}
}
});

// Lambda表达式
Thread thread = new Thread(()-> {
for(int i = 0;i<10;i++){
System.out.println(i);
}
});

主线程与子线程的关系

  • 正常情况下主线程结束运行也不会影响子线程的运行。
  • 某一个线程执行出错也不会影响其他线程的执行。
  • setDaemon()方法,设置了之后,主线程退出时,子线程都会跟着结束。

sleep

最好不要使用Thread.sleep()方法来协调线程之间的工作。

jion

当前线程等待thread代表的线程结束👍

  • interrupt()方法
  • 结论:如果你对一个处于一个活跃的运行状态的

并不在阻塞型并且看以响应interrupt的io操作时,这个线程实际上并不会受到任何干扰,但其标记位会被设置为true。

Thread类和Runnable接口的区别

1


守护线程 Daemon Thread

  • Java中有两类线程,用户线程和守护线程。
  • 守护线程:指程序运行的时候在后台提供一种通用服务的线程。
  • 当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中所有的守护线程。反过来,只要非守护线程还在运行,程序就不会终止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TestDeamon {
public static void main(String[] args) {

Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
for (int i = 0; i < 100; i++) {
System.out.println(name + "---"+i);
}
}
});

Thread t2 = new Thread(() -> {
String name = Thread.currentThread().getName();
for (int i = 0; i < 10; i++) {
System.out.println(name + "---"+i);
}
});

t1.setDaemon(true); // 要先于start()
t1.start();
t2.start();
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
Thread-0---0
Thread-1---0
Thread-1---1
Thread-1---2
Thread-1---3
Thread-1---4
Thread-1---5
Thread-1---6
Thread-1---7
Thread-1---8
Thread-1---9
Thread-0---1
  • t1作为守护线程运行,当用户线程执行结束之后,它就会立刻停止运行。

注意:

  • setDeamon()要在start()之前,否则会有IllegalThreadStateException异常的出现。即不能把正在运行的线程设置为守护线程
  • 守护线程应该永远不去访问固有资源,例如文件、数据库等,它可能会在任何时刻的操作中间发生中断。
  • 若main方法中仅有一个线程t1,并且设置为了守护线程,那么其内的run()永远都不会执行。
  • 守护线程创建的线程仍然也是守护线程,使用isDeamon()可以判断是否是守护线程。

https://blog.csdn.net/shimiso/article/details/8964414


并发编程 —— Executor

java.util.concurrent

  • 并发任务的提交
  • 线程安全集合类
  • CAS类,如AtomicInteger
  • 线程协调类,如Semaphore

Executor

  • 将线程创建、维护和任务分离,“线程池化”,避免Thread创建、启动消耗。

接口:Callable,仅有一个方法,它是有返回值的。

  • 当需要子任务返回给一个结果的时候,可以使用这样的接口。
1
2
3
public interface Callable<v>{
v call() throws Exception;
}

接口:Future,获得子任务执行的结果,get()

  • get():无限制的等待下去
  • get(params):有参

接口:ScheduledExecutorService,执行定时任务的,返回值可选。

生成线程池:Executors

  • 不推荐使用,使用自定义的线程池

  • newFixedThreadPool(int):区别

  • newCachedThreadPool():

  • 7分钟 getcouse


Lock ReentrantLock

  • Lock最基本的

  • ReentrantLock 表示锁可重入,表示一个线程第一次获得这个锁之后,如果还没有释放,那么它还可以继续获取这个锁。

    1
    2
    3
    public ReentrantLock(boolean fair){
    sync = fair ? new FairSync() : new NonfairSync();
    }
  • 锁的fair:一个后来的线程,是不是能够先获得这个锁?能获得,公平?不公平?

使用

  • Synchronized 比较重,但是做了很多优化之后,有待商榷。
  • Synchronized是一个非公平的锁
  • Synchronized也不需要手动释放锁
  • ReadWriteLock:可重入。写本身需要枷锁,读时可以并发的去读。

CAS

  • Compare And Set/Swap 对比 volatile
  • 当要修改某一值的瞬间时,比较当前值和预期值是否一致
    • 一致时,才会进行修改,否则不进行任何操作。
  • 原子类操作
    • AtomicInteger
    • Atomiclong,专门的计数器类 LongAdder
    • AtomicBoolean
    • 对于自定义对象,AtomicReference
    • 使用反射进行原子类操作,AtomicXXXFieldUpdater

多线程协调相关

Semaphore 信号量

permits count

实际上时控制资源总的同时访问个数

  • 对于信号量来说,必须在获取资源之后把它释放掉

如何动态的修改permit的个数?

  • 提示:release方法是可以多次调用的

Semaphore是一个计数信号量,必须由获取它的线程释放,通常用于限制可以访问某些资源线程数目,信号量控制的是线程并发的数量。

计数器:一个信号量有且仅有三种操作,且全部是原子的,初始化、增加、减少。

  • 增加:为一个线程解除阻塞
  • 减少:让一个线程进入阻塞

原理理解

Semaphore是用来保护一个或者多个共享资源的访问,Semaphor内部维护了一个计数器,其值为可以访问的共享资源的个数,一个线程要访问共享资源,先要去获得信号量,如果信号量保存的计数器值大于1,则意味着可以有共享资源进行访问,访问之后,计数值减一,再去访问共享资源。

如果计数器值为0,则线程就会进入休眠状态,当某个线程使用完共享资源之后,释放信号量,并将信号量内部的计数器+1,之前进入休眠状态的线程将会被唤醒并再次试图获得信号量。

CountDownLatch

one wait for all

CyclicBarrier

all wait for all

  • 当所有的子任务都需要阶段性的执行一个个任务时

线程池

  • 降低资源的消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当有任务时,任务可以不需要等待到线程创建就能立即执行。
  • 提高线程的可管理性:线程池可以进行统一的分配、调优和监控。

Executor框架

https://blog.csdn.net/tongdanping/article/details/79604637

  • 用于统一创建和运行的接口,实现线程池的功能,将工作单元和执行机制分离。

包含三大部分:

  • 任务:工作单元,宝库哦被执行任务需要实现的接口,Runnable接口,Callable接口。
  • 任务的执行:把任务分派给多个线程的执行机制,包括Executor接口以及继承自Executor接口的ExecutorService接口。
    • 异步任务的提交:submit(Callable)方法可以提交多线程异步运算。
  • 异步计算的结果:包含Future接口,以及实现了Future接口的FutureTask接口。

Callable & Future

https://www.cnblogs.com/flydean/p/12680281.html

视频理解:https://www.bilibili.com/video/BV1ov41167vH

https://www.bilibili.com/video/BV1n64y1o7S4?p=11

  • 底层基于LockSupport

Runnable缺陷 & Callable改进

1
public void run(){}
  • 没有返回值。

  • 但是某些特定程序需要异步执行的返回结果。

  • Callable接口时Runnable的增强版,其提供了一个call方法,可以看作时线程的执行体。

    • call()可以有返回值
    • call()可以声明抛出异常

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadCallable implements Callable<Integer>{
// 线程要返回结果,结果类型为Integer

@override
public Integer call() throws Exception{
try{
Thread.sleep(1000);
}catch (Exception e){

}
return 1;
}


public void test(){
ThreadCallable threadCallable = new ThreadCallable();
FutureTask<Integer> inteFuTa = new FutureTask<>(threadCallable);
new Thread(inteFuTa).start();
Integer result = inteFuTa.get();
System.out.println(result);
}

}

Future

一直以来都对FutureTask这个“Future”不理解,为什么叫做“未来的任务呢”?这个“Future”体现在哪里呢?现在终于明白,FutureTask的Future就源自于它的异步工作机制,如果我们在主线程中直接写一个函数来执行任务,这是同步的任务,也就是说必须要等这个函数返回以后我们才能继续做接下的事情,但是如果这个函数返回的结果对接下来的任务并没有意义,那么我们等在这里是很浪费时间的,而FutureTask就提供了这么一个异步的返回结果的机制,当执行一个FutureTask的时候,我们可以接着做别的任务,在将来的某个时间,FutureTask任务完成后会返回FutureTask对象来包装返回的结果,只要调用这个对象的get()方法即可获取返回值。
https://blog.csdn.net/tongdanping/article/details/79630637

Future用于异步执行结果的获取,代表结果,当异步执行结束后,返回的结果就会保存在Future中。

使用场景:当执行一个长时间运行的任务时,使用Future就可以暂时去处理其他任务,等长任务执行完毕后再返回其结果。

  • 计算密集场景
  • 处理大数据量
  • 远程方法调用

创建和使用:

1
2
// ExecutorService
<T> Future<T> submit(Callable<T> task);

ExecutorService中定义的一个submit方法,它接收一个Callable参数,并返回一个Future

Callable需要实现一个call方法,并返回结果。

1
2
3
4
5
6
7
8
9
ExecutorService executor = Executors.newSingleThreadExecutor();

Future<Integer> calculate(Integer input){
return executor.submit(() -> {
System.out.println("Calculating..." + input);
Thread.sleep(1000);
return input*input;
});
}

从Future中获取结果

1
2
3
4
5
6
Future<Integer> futureOne = calculate(100);// 调用上面的方法
while(!futureone.isDone()){
sout:"Calcuting";
Thread.sleep(3000);
}
Integer result = futureOne.get();
  • 首先使用isDone()来判断异步操作是否执行完毕
  • 执行完毕则使用get()来获取结果。
  • 取消future:Future.cancel(boolean),传入参数true,取消。

FutureTask详解 https://blog.csdn.net/qq_39654841/article/details/90631795

  • 可以把FutureTask交给Executor执行;也可以通ExecutorService.submit(…)方法返回一个FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel(…)方法。除此以外,还可以单独使用FutureTask。

https://flydean.blog.csdn.net/article/details/105191889

Guava Futures

successfulAsList(Iterable<ListenableFuture<V>>):返回一个ListenableFuture,该Future的结果包含所有成功的Future,按照原来的顺序,当其中之一Faied或者cancel时,使用null替代。

https://blog.csdn.net/qq496013218/article/details/77522820

  • 传统JDK的Future,通过异步的方式计算返回结果:在多线程运算中可能没有结束返回结果,Future是运行中的线程的一个引用句柄,确保在服务执行返回一个Result。
  • Guava的ListenableFuture可以允许注册回调方法,在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后 立即执行。这样能够支持更多的操作。
    • addListener(Runnable,Executor)方法会在多线程运算完之后,指定的Runnable参数会被指定的Executor执行。
    • 添加回调Futures.addCallback(ListenableFuture, FutureCallback, Executor)

CompletableFuture

https://flydean.blog.csdn.net/article/details/105191889

https://blog.csdn.net/tongtest/article/details/107549749

https://blog.csdn.net/tongtest/article/details/107577226

  • 是一个Future的实现类,基本的功能可以获取异步执行的结果,取消正在执行的任务等。
  • FutureTask也是Future的实现类。
1
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 
  • CompletionStage可以来实现异步调用的链式操作,控制异步程序的执行顺序。(使用thenXXX来实现)。

使用:

1
2
3
4
5
6
7
CompletableFuture.complete(); //可以立刻返回结果
CompletableFuture.completedFuture();//直接返回一个Future
CompletableFuture.cancle(true);//取消任务执行

// 获取Future的最终计算结果
T result = Completable.join();
T result = Completeble.get();
  • supplyAsync创建

    1
    2
    3
    4
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);

    // 指定线程池,将任务提交给线程池进行处理。
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
    1
    2
    3
    4
    5
    6
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "test";
    });

    String result = future.join();
    System.out.println("get result: " + result);
  • runAsync创建,Runnable类型的无返回值!

    1
    2
    3
    public static CompletableFuture<Void> runAsync(Runnable runnable);

    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
    1
    2
    3
    4
    5
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("runAsync");
    });

    System.out.println("get result: " + future.get());

    https://blog.csdn.net/cainiao_user/article/details/7642349

  • thenCompose的方法的效果,是将前一个任务的结果转交给下一个线程。

  • thenCombine的作用是将上一个任务和下一个任务一起执行,得到两个结果,再把两个结果加工成一个结果。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // 常用流式连接函数
    // 带有Async的后缀的函数表示需要连接的后置任务会被单独提交到线程池之中,从而相对于前置任务来说是异步运行的。
    thenApply
    thenApplyAsync

    thenAccept
    thenAcceptAsync

    thenRun
    thenRunAsync

    thenCombine
    thenCombineAsync

    thenCompose
    thenComposeAsync

    whenComplete
    whenCompleteAsync

    // whenComplete主要用于注入任务完成时的回调通知逻辑。这个解决了传统future在任务完成时,无法主动发起通知的问题。前置任务会将计算结果或者抛出的异常作为入参传递给回调通知函数。

    handle
    handleAsync
  • handle

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("future1 run");
    return 1;
    });

    CompletableFuture<Integer> future2 = future1.handle((r,e) -> {
    if(e != null){
    System.out.println("err!");
    return r;
    }else{
    System.out.println("future2 run");
    return r + 1;
    }
    });

    System.out.println(future2.join());

函数式接口

Supplier

  • 一个函数式接口
  • 中文:供应者
1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Supplier<T> {

/**
* Gets a result.
*
* @return a result
*/
T get();
}

Function

  • Represents a function that accepts one argument and produces a result.
1
2
3
4
5
6
7
8
9
10
11
12
@FunctionalInterface
public interface Function<T, R> {

/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
// ...
}

BiFunction

  • Represents a function that accepts two arguments and produces a result. This is the two-arity specialization of Function.
1
2
3
4
5
6
7
8
9
10
11
12
13
@FunctionalInterface
public interface BiFunction<T, U, R> {

/**
* Applies this function to the given arguments.
*
* @param t the first function argument
* @param u the second function argument
* @return the function result
*/
R apply(T t, U u);
// ...
}

Java多线程

run与start

1
2
3
4
5
6
7
8
9
10
11
12
public class TestRun extends Thread{

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "<是子线程>");
}

public static void main(String[] args) {
//new TestRun().run();
new TestRun().start();
}
}
  • start是开启一个新的线程,会有多条执行路径。主线程和子线程并行交替执行。
  • run则是简单的执行此函数!

执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TestRun extends Thread{

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "<是子线程>");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +"<线程睡眠完成>");
}

public static void main(String[] args) {
System.out.println("主线程开始执行");
new TestRun().start();
new TestRun().start();
System.out.println("主线程执行结束");

}
}
1
2
3
4
5
6
主线程开始执行
主线程执行结束
Thread-0<是子线程>
Thread-1<是子线程>
Thread-1<线程睡眠完成>
Thread-0<线程睡眠完成>
  • 若子线程的run方法报错了,是否会影响主线程呢?
  • 答:不会的!各自独立执行。

实现Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadRunnable implements Runnable{

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "<是子线程>");
}

public static void main(String[] args) {
// 启动线程

// 方式一:
new Thread(new ThreadRunnable()).start();

// 方式二:使用匿名内部类形式创建
new Thread(new Runnable(){
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "<是子线程>");
}
}).start();

// 方式三:Lambda
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "<是子线程>");

}).start();

}
}

sleep

sleep()会使得一个运行态的程序让出CPU的执行权限。

sleep()结束之后,会转变为就绪状态,等待CPU

线程安全

全局变量存放于堆内存之中


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!