手写理解Callable,Future,Executor

5个月前 (12-09) 0 点赞 0 收藏 0 评论 8 已阅读

前言

Callable,Future,Executor都是java.util.concurrent包下的工具类,作者李二狗,为了彻底吃透它们的概念,今天就假设这些类都不存在,自己通过实际场景封装出这些工具的山寨版

需求

假设你需要写一个简单的方法,两个值求和,非常简单

public int sum(int x, int y) {
    return x + y;
}

但需求增加了,需要计算的过程在一个新线程中执行,这代码该怎么写?就会出现以下两个问题:

怎么获取到线程执行的结果?
怎么知道新线程什么时候执行完?

实现

首先第一个问题,如何获取新线程结果,这个也好解决,虽然新线程里的变量我取不到,但内存是线程共享的啊,只要提前定义一个结果变量即可

private volatile int outcome;

public int sum(int x, int y) {
    new Thread(()->{
        outcome = x + y;
    }).start();
    return outcome;
}

很明显,最终结果肯定不对,因为返回的时候新线程可能都没开始运行,这就是第二个问题:怎么知道新线程什么时候执行完?

解决的方案当然也很多,只要线程之间进行通讯一下即可,我们用LockSupport方法来实现通讯

public class OperationTest {

    private volatile int outcome;

    private Thread waitThread;

    public int sum(int x, int y) throws InterruptedException {
        waitThread = Thread.currentThread();
        new Thread(() -> {
            outcome = x + y;
            // 计算完成通知等待线程
            LockSupport.unpark(waitThread);
        }).start();
        // 等待计算完成
        LockSupport.park(this);
        return outcome;
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println(new OperationTest().sum(2,3));
    }
}

此时我们就完成了这个需求,但看一下代码真的好麻烦啊,明明就是一个1+1等于几的事,写了这么多代码,如果明天再来个写减法的需求,我还要重写这么一堆

封装

其实上面解决的两个问题,完全就是通用性的问题,加法这样处理,减法一样也是这个解决思路,那么我们就可以把加法、减法等有返回值的方法用函数式接口给抽象化

// 使用泛型兼容各种类型返回
@FunctionalInterface
public interface Callable<V> {
    V call();
}

接下来我们就要封装一个开启新线程执行它的工具,提供如下功能:

只要传入一个方法作为参数,就可以开启一个新线程去执行这个方法,并返回执行结果
也可以开启新线程执行普通的Runable方法

这个工具命名为ExecutorService

public interface ExecutorService {
    /**
     * 执行callable并返回执行结果
     * @param task
     * @param <T>
     */
    <T> T submit(Callable<T> task);

    /**
     * 也可以执行Runnable
     * @param runnable
     */
    void execute(Runnable runnable);
}

然后开始实现这个工具,暂时叫做NewThreadExecutor(新线程执行器)

public class NewThreadExecutor implements ExecutorService {

    private volatile Object outcome;

    private Thread waitThread;

    @Override
    public <T> T submit(Callable<T> task) {
        waitThread = Thread.currentThread();
        execute(()->{
            outcome = task.call();
            // 计算完成通知等待线程
            LockSupport.unpark(waitThread);
        });
        // 等待计算完成
        LockSupport.park(this);
        return (T) outcome;
    }

    @Override
    public void execute(Runnable runnable) {
        // 执行方式就是开启一个线程去执行
        new Thread(runnable).start();
    }

}

这时我们再实现上面的需求就轻而易举了

int x = 2;
int y = 3;
Integer sub = new NewThreadExecutor().submit(() -> {
    return x + y;
});
System.out.println(sub);

这样通过我们的逻辑和执行解耦,可以方便使用工具执行减法、乘法或其它复杂运算逻辑

多线程

再回头看一下我们这个工具,实际上非常不合理,开了一个新线程去执行函数,整个过程主线程却全程傻等

相当于一个主管带一个员工干活,而员工干活时,主管干不了别的事只能等着,那干脆主管自己干得了呗,何必聘请这么一个员工

而我们希望开启新线程后主线程可以去干别的(比如分配新任务给其它线程执行),等全分配完任务再统一获取结果,这样才算是多线程并行作业

那么如何改造代码呐?

首先,调用submit方法不能阻塞,应该直接返回一个对象,主线程再想要获取的时候,才通过这个对象阻塞获取结果

这个对象不是运行结果,但通过它可以获得结果,他就像一个未来的约定,我们先使用代码给它抽象出来,命名为Future

public interface Future<V> {
    // 是否运行完成
    boolean isDone();
    // 获取运行结果
    V get();
}

此时我们的ExecutorService返回结果变为Future对象

public interface ExecutorService {
    /**
     * 执行callable并返回future
     * @param task
     * @param <T>
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 也可以执行Runnable
     * @param runnable
     */
    void execute(Runnable runnable);
}

那么此时如何改造NewThreadExecutor这个实现呐?

首先要实现Future抽象,这个对象可以获取到执行结果,那么它肯定可以访问到存储执行结果的对象(outcome)和等待线程对象(waitThread),那不妨就把这两个对象放入Future实现中,同时最终执行的Runable方法也要可以访问到这两个对象,那不妨就让Future的实现同时就是最终执行的Runable,即可执行的Future,取名为FutureTask

public class FutureTask<V> implements Future<V>, Runnable {

    private Callable<V> callable; // 要执行的方法

    private volatile Object outcome; // 执行结果

    private Thread waitThread; // 等待的线程

    public FutureTask(Callable<V> callable) {
        this.callable = callable;
    }

    @Override
    public boolean isDone() {
        return outcome!=null;
    }

    @Override
    public V get() {
        waitThread = Thread.currentThread();
        if (isDone()) { // 如果已经执行完直接返回
            return (V) outcome;
        }
        // 否则等待
        LockSupport.park(this);
        return (V) outcome;
    }

    @Override
    public void run() {
        // 开始执行
        outcome = callable.call();
        // 计算完成通知等待线程
        LockSupport.unpark(waitThread);
    }
}

此时NewThreadExecutor改造如下

public class NewThreadExecutor implements ExecutorService {

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        FutureTask<T> futureTask = new FutureTask<>(task);
        execute(futureTask);
        return futureTask; // 直接返回
    }

    @Override
    public void execute(Runnable runnable) {
        // 执行方式就是开启一个线程去执行
        new Thread(runnable).start();
    }

}

这时我们就可以让两个子线程分别同时计算两个结果,最终主线程求和(真正的做到多线程计算)

Future<Integer> future1 = new NewThreadExecutor().submit(() -> {
    return 3 + 4;
});
Future<Integer> future2 = new NewThreadExecutor().submit(() -> {
    return 1 + 2;
});
System.out.println(future1.get()+future2.get());

扩展

以上封装的工具,达到了传入一个方法开启一个新线程计算的功能,并且使用future概念避免了阻塞

但工具还能再扩展一下,比如有一天领导让实现传入一个方法指定某一线程执行,或传入方法从几个固定线程中选一个空闲的去执行(线程池)

由于我们做到了逻辑和执行的分离解耦,所以只要重写一下execute就可以了,而无论如何执行submit的逻辑是不变的,我们可以继续给它抽象出来,命名为AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService {

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        FutureTask<T> futureTask = new FutureTask<>(task);
        execute(futureTask);
        return futureTask; // 直接返回
    }

}

此时它的继承者就可以传入方法并返回future,只需关注如何执行即可,比如我们的开启新线程执行工具

public class NewThreadExecutor extends AbstractExecutorService {

    @Override
    public void execute(Runnable runnable) {
        // 执行方式就是开启一个线程去执行
        new Thread(runnable).start();
    }

}

再比如使用线程池去执行

public class ThreadPoolExecutor extends AbstractExecutorService {
    @Override
    public void execute(Runnable runnable) {
        // 从线程池中选一个线程去执行
    }
}

最后

以上代码的命名基本参照jdk的源码,可以自行对照,相信再看源码就会非常清晰,也可以结合Executor源码详解解读源码


手写理解Callable,Future,Executor

本文收录在
0评论

登录

忘记密码 ?

切换登录

注册