美团原题:深入探讨Java中的线程管理:Thread、Runnable、Callable、Future和FutureTask之间的关系及应用实例

  1. Thread与Runnable的内在联系

1.1 线程的基本概念

我们从最简单的Thread示例开始了解其基本使用:

public class MyThread extends Thread {  
    public MyThread(String name) {  
        super(name);  
    }  
    @Override  
    public void run() {  
        String name = Thread.currentThread().getName();  
        System.out.println(name + "已经运行");  
    }  
    public static void main(String[] args) {  
        new MyThread("线程一").start();  
    }  
}

线程在运行过程中有四种状态:创建 -> 就绪 -> 运行 -> 结束。当调用start()方法后,线程会进入就绪状态,随后在获取到CPU调度资源后进入运行状态,执行run()方法,最后结束。

1.2 Runnable接口的使用

接下来,我们来看Runnable接口的基本实现:

public class MyTask implements Runnable {  
    @Override  
    public void run() {  
        String name = Thread.currentThread().getName();  
        System.out.println(name + "已经运行");  
    }  
    public static void main(String[] args) {  
        new Thread(new MyTask(),"线程二").start();  
    }  
}

在这个示例中,MyTask实现了Runnable接口并重写了run()方法,作为Thread的构造参数。

尽管许多人都了解如何使用Runnable,但其工作原理你是否清楚呢?

1.3 Thread和Runnable的联系

我们来看Runnable接口的定义:

public interface Runnable {  
    /**  
     * 当实现Runnable接口的对象用于创建线程时,启动该线程将调用该对象的run方法。  
     */  
    public abstract void run();  
}

这一段可以简单理解为:当一个对象实现了run()方法并被用于启动一个线程时,start()的调用会使得该对象的run()方法在新线程中被执行。

由此可以总结出Thread和Runnable的关系:

  1. MyTask实现了Runnable接口并提供了run()方法;
  2. Thread在初始化时将MyTask实例作为目标;
  3. 当Thread的run()方法被调用时,实际上是执行MyTask.run()方法。

以下是Thread与Runnable的关系图示:

图片图片

1.4 策略模式的应用

在这个过程中,Thread初始化时将MyTask作为参数赋值给Thread.target,最终在调用Thread.run()时,执行的是target.run(),即MyTask.run()。这体现了策略模式的运用。

  1. Callable、Future和FutureTask的关系解析

接下来,我们要探讨Callable、Future和FutureTask之间的关系:

图片

初见此图,Java的线程机制似乎显得复杂,尤其是已经有Thread和Runnable两个创建线程的方案,为什么还要增加Callable、Future和FutureTask这三者呢?

实际上,Thread和Runnable的run()方法都没有返回值,并且不能抛出异常,因此在需要返回多线程结果时,我们需要借助Callable和Future。

2.1 Callable接口

Callable是一个泛型接口,定义了一个返回值类型为V的call()方法:

public interface Callable<V> {  
    /**  
     * 计算结果,若无法计算则抛出异常。  
     * @return 计算结果  
     * @throws Exception 若无法计算结果  
     */  
    V call() throws Exception;  
}

通常我们会以匿名类的方式使用Callable,call()中包含具体的业务逻辑:

Callable<String> callable = new Callable<String>() {  
    @Override  
    public String call() throws Exception {  
        // 执行业务逻辑 ...  
        return "this is Callable is running";  
    }  
};

此时,Callable.call()与Thread.run()之间的关系又是什么呢?

2.2 FutureTask的实现

从关系图中可以看到,FutureTask实现了RunnableFuture接口,而RunnableFuture又继承了Runnable和Future:

public interface RunnableFuture<V> extends Runnable, Future<V> {  
    /**  
     * 设置这个Future的计算结果,除非它已经被取消。  
     */  
    void run();  
}

因此,FutureTask本质上也是一个Runnable实例!

这就产生了新的思路,既然FutureTask是Runnable,那它可以作为Thread的构造参数:

new Thread(FutureTask对象).start();

因此,当执行Thread.run()时,实际上执行的就是FutureTask.run()。

接下来我们要探讨FutureTask.run()和Callable.call()之间的关系。

2.3 Callable与FutureTask的内在协调

初始化FutureTask时,必须将Callable对象作为参数传入:

图片

当执行FutureTask.run()时,内部实际上执行的是Callable.call():

图片

这又是一个策略模式的运用!

通过以上分析,我们可以清晰地理解Thread、Runnable、FutureTask和Callable之间的关系:

  • Thread.run()调用的是Runnable.run();
  • FutureTask继承Runnable,并实现了FutureTask.run();
  • FutureTask.run()调用的是Callable.call();
  • 最终,Thread.run()实际上是执行Callable.call()。

因此,整个设计思路实际上是两个策略模式的结合,Thread和Runnable构成了一个策略模式,而FutureTask和Callable又构成了另一个策略模式。这两个策略模式通过Runnable和FutureTask的继承关系整合在一起。

2.4 对Future的必要性进行探讨

那么,Future的存在到底有什么意义呢?当我们通过FutureTask并借助Thread执行线程后,结果数据如何获取?这便是Future的用武之地。

我们来看一下Future接口的定义:

public interface Future<V> {  
    // 取消任务,如果任务正在运行,mayInterruptIfRunning为true时,会打断任务并返回true;否则,会等待任务执行完毕并返回true;若任务未执行则返回true,若已执行完则返回false  
    boolean cancel(boolean mayInterruptIfRunning);  
    // 判断任务是否被取消,正常执行完不算被取消  
    boolean isCancelled();  
    // 判断任务是否已完成,任务取消或发生异常也算完成,返回true  
    boolean isDone();  
    // 获取任务返回结果,若任务尚未完成则会阻塞等待,若获取过程中发生异常则抛出异常  
    V get() throws InterruptedException, ExecutionException;  
    // 在指定的时间内若未返回结果则抛出TimeoutException  
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;  
}

对于FutureTask而言,Callable就是它的任务,而FutureTask内部维护了任务状态,所有状态的变化都与任务的执行密切相关。FutureTask实现了Future接口,提供了对任务的取消、结果获取以及状态检查等功能。

例如,我们可以使用get()方法来获取结果,如果任务尚未完成,当前线程会被阻塞,直到任务完成后唤醒。

  1. 具体实例

下面是一个简单的多线程处理示例:

private static List<String> processByMultiThread(Integer batchSize) throws ExecutionException, InterruptedException {  
    List<String> output = new ArrayList<>();  
    // 获取分批数据  
    List<List<Integer>> batchProcessData = getProcessData(batchSize);  
    // 启动线程  
    List<FutureTask<List<String>>> futureTaskList = new ArrayList<>();  
    for (List<Integer> processData : batchProcessData) {  
        Callable<List<String>> callable = () -> processOneThread(processData);  
        FutureTask<List<String>> futureTask = new FutureTask<>(callable);  
        new Thread(futureTask).start();  // 启动线程  
        futureTaskList.add(futureTask);  
    }  
    // 获取线程返回的数据  
    for (FutureTask futureTask : futureTaskList) {  
        List<String> processData = (List<String>) futureTask.get();  
        output.addAll(processData);  
    }  
    return output;  
}

这个示例的流程是:

  1. 将数据分批处理;
  2. 启动对应数量的线程来执行任务;
  3. 通过futureTask.get()来获取每个线程的返回数据,并进行汇总。

值得注意的是,这个示例在实际生产环境中可能不是最佳选择,因为每次调用都会创建新的线程,从而可能导致内存资源的浪费。因此,在实际应用中,建议使用线程池进行管理。

关于完整示例代码以及线程池的使用,请查阅githubhttps://github.com/lml200701158/java-study/blob/master/src/main/java/com/java/parallel/share/MultiThreadProcess.java