ThreadPoolExecutor - Java 线程池示例

Java thread pool 管理了 worker threads 池. 它包含一个排队等待执行的任务。 我们可以使用 ThreadPoolExecutor 来创建 Java 中的 thread pool. Java thread pool 管理了可运行 threads 的收集。 工人 threads 从排队执行可运行 threads. java.util.concurrent.Executors 提供工厂和支持方法的 java.util.concurrent.Executor 界面,以创建 Java 中的 thread pool. Executors 是一个实用类,还提供有用的方法来通过各种工厂方法与 ExecutorService, ScheduledExecutorService, ThreadFactory 和 Callable 类一起工作。 让我们写一个简单的程序来解释它在工作。 首先,我们需要有一个可运行

 1package com.journaldev.threadpool;
 2
 3public class WorkerThread implements Runnable {
 4
 5    private String command;
 6
 7    public WorkerThread(String s){
 8        this.command=s;
 9    }
10
11    @Override
12    public void run() {
13        System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
14        processCommand();
15        System.out.println(Thread.currentThread().getName()+" End.");
16    }
17
18    private void processCommand() {
19        try {
20            Thread.sleep(5000);
21        } catch (InterruptedException e) {
22            e.printStackTrace();
23        }
24    }
25
26    @Override
27    public String toString(){
28        return this.command;
29    }
30}

管理员示例

以下是测试程序类 SimpleThreadPool.java,我们正在从 Executors 框架创建固定线程池。

 1package com.journaldev.threadpool;
 2
 3import java.util.concurrent.ExecutorService;
 4import java.util.concurrent.Executors;
 5
 6public class SimpleThreadPool {
 7
 8    public static void main(String[] args) {
 9        ExecutorService executor = Executors.newFixedThreadPool(5);
10        for (int i = 0; i < 10; i++) {
11            Runnable worker = new WorkerThread("" + i);
12            executor.execute(worker);
13          }
14        executor.shutdown();
15        while (!executor.isTerminated()) {
16        }
17        System.out.println("Finished all threads");
18    }
19}

在上面的程序中,我们正在创建一个固定尺寸的5个工人线索的线索池,然后我们向这个池提交10个工作岗位,因为池的尺寸为5个,它将开始工作5个工作岗位,其他工作岗位将处于等待状态,一旦一个工作完成,另一个工作岗位从等待队伍中将被工人线索接到并被执行。

 1pool-1-thread-2 Start. Command = 1
 2pool-1-thread-4 Start. Command = 3
 3pool-1-thread-1 Start. Command = 0
 4pool-1-thread-3 Start. Command = 2
 5pool-1-thread-5 Start. Command = 4
 6pool-1-thread-4 End.
 7pool-1-thread-5 End.
 8pool-1-thread-1 End.
 9pool-1-thread-3 End.
10pool-1-thread-3 Start. Command = 8
11pool-1-thread-2 End.
12pool-1-thread-2 Start. Command = 9
13pool-1-thread-1 Start. Command = 7
14pool-1-thread-5 Start. Command = 6
15pool-1-thread-4 Start. Command = 5
16pool-1-thread-2 End.
17pool-1-thread-4 End.
18pool-1-thread-3 End.
19pool-1-thread-5 End.
20pool-1-thread-1 End.
21Finished all threads

输出确认,池中有五个线条名为pool-1-thread-1pool-1-thread-5,它们负责执行提交给池的任务。

ThreadPool执行者示例

Executors类提供简单的实施 ExecutorService使用 ThreadPoolExecutor,但ThreadPoolExecutor提供的功能比这要多得多。我们可以指定当我们创建ThreadPoolExecutor实例时将活着的线程的数量,我们可以限制线程池的大小,并创建自己的 RejectedExecutionHandler实现,以处理不能适合工人队列的任务。

 1package com.journaldev.threadpool;
 2
 3import java.util.concurrent.RejectedExecutionHandler;
 4import java.util.concurrent.ThreadPoolExecutor;
 5
 6public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
 7
 8    @Override
 9    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
10        System.out.println(r.toString() + " is rejected");
11    }
12
13}

ThreadPoolExecutor提供了几种方法,我们可以使用它们来查找执行器的当前状态、池大小、活跃线程数和任务数,所以我有一个监控线程,可以在一定的时间间隔上打印执行器信息。

 1package com.journaldev.threadpool;
 2
 3import java.util.concurrent.ThreadPoolExecutor;
 4
 5public class MyMonitorThread implements Runnable
 6{
 7    private ThreadPoolExecutor executor;
 8    private int seconds;
 9    private boolean run=true;
10
11    public MyMonitorThread(ThreadPoolExecutor executor, int delay)
12    {
13        this.executor = executor;
14        this.seconds=delay;
15    }
16    public void shutdown(){
17        this.run=false;
18    }
19    @Override
20    public void run()
21    {
22        while(run){
23                System.out.println(
24                    String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
25                        this.executor.getPoolSize(),
26                        this.executor.getCorePoolSize(),
27                        this.executor.getActiveCount(),
28                        this.executor.getCompletedTaskCount(),
29                        this.executor.getTaskCount(),
30                        this.executor.isShutdown(),
31                        this.executor.isTerminated()));
32                try {
33                    Thread.sleep(seconds*1000);
34                } catch (InterruptedException e) {
35                    e.printStackTrace();
36                }
37        }
38
39    }
40}

以下是使用 ThreadPoolExecutor的 thread pool 实现示例。

 1package com.journaldev.threadpool;
 2
 3import java.util.concurrent.ArrayBlockingQueue;
 4import java.util.concurrent.Executors;
 5import java.util.concurrent.ThreadFactory;
 6import java.util.concurrent.ThreadPoolExecutor;
 7import java.util.concurrent.TimeUnit;
 8
 9public class WorkerPool {
10
11    public static void main(String args[]) throws InterruptedException{
12        //RejectedExecutionHandler implementation
13        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
14        //Get the ThreadFactory implementation to use
15        ThreadFactory threadFactory = Executors.defaultThreadFactory();
16        //creating the ThreadPoolExecutor
17        ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
18        //start the monitoring thread
19        MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
20        Thread monitorThread = new Thread(monitor);
21        monitorThread.start();
22        //submit work to the thread pool
23        for(int i=0; i<10; i++){
24            executorPool.execute(new WorkerThread("cmd"+i));
25        }
26
27        Thread.sleep(30000);
28        //shut down the pool
29        executorPool.shutdown();
30        //shut down the monitor thread
31        Thread.sleep(5000);
32        monitor.shutdown();
33
34    }
35}

请注意,当我们初始化 ThreadPoolExecutor 时,我们将初始池大小保持为 2,最大池大小保持为 4,工作队列大小保持为 2,所以如果有 4 个运行任务并提交了更多任务,工作队列只会持有 2 个,其余的将由RejectedExecutionHandlerImpl处理。

 1pool-1-thread-1 Start. Command = cmd0
 2pool-1-thread-4 Start. Command = cmd5
 3cmd6 is rejected
 4pool-1-thread-3 Start. Command = cmd4
 5pool-1-thread-2 Start. Command = cmd1
 6cmd7 is rejected
 7cmd8 is rejected
 8cmd9 is rejected
 9[monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
10[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
11pool-1-thread-4 End.
12pool-1-thread-1 End.
13pool-1-thread-2 End.
14pool-1-thread-3 End.
15pool-1-thread-1 Start. Command = cmd3
16pool-1-thread-4 Start. Command = cmd2
17[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
18[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
19pool-1-thread-1 End.
20pool-1-thread-4 End.
21[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
22[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
23[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
24[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
25[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
26[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
27[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
28[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true

注意执行器的活动、完成和总完成任务数的变化。我们可以调用 **shutdown() 方法来完成所有提交的任务的执行,并终止线程池。如果您想计划一个任务以延迟或定期运行,那么您可以使用 ScheduledThreadPoolExecutor 类。 在 **[Java Schedule ThreadPoolExecutor](/community/tutorials/java-scheduler-scheduledexecutorservice-scheduledthreadpooleexecutor-exampleJava ScheduledThreadPoolExecutor 示例以延迟和执行期间后安排任务)**中阅读更多。

Published At
Categories with 技术
Tagged with
comments powered by Disqus