Java BlockingQueue 示例

java.util.concurrent.BlockingQueue是一个 Java 队列,它支持等待队列在检索和删除元素时变成非空的操作,并在添加元素时等待空间在队列中可用。

Java 区块链

Java BlockingQueue Java BlockingQueue doesn't accept null values and throw NullPointerException if you try to store null value in the queue. Java BlockingQueue implementations are thread-safe. All queuing methods are atomic in nature and use internal locks or other forms of concurrency control. Java BlockingQueue interface is part of java collections framework and it's primarily used for implementing producer consumer problem. We don't need to worry about waiting for the space to be available for producer or object to be available for consumer in BlockingQueue because it's handled by implementation classes of BlockingQueue. Java provides several BlockingQueue implementations such as ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue etc. While implementing producer consumer problem in BlockingQueue, we will use ArrayBlockingQueue implementation. Following are some important methods you should know.

  • put(E):此方法用于将元素插入队列中.如果队列满,则等待空间可用.
  • E take():此方法从队列的头部获取并删除元素。

现在让我们使用 Java BlockingQueue 来实现生产者消费者问题。

Java BlockingQueue 示例 - 信息

只是一个正常的Java对象,它将由制作人生产并添加到队列中,你也可以称之为负载或队列消息。

 1package com.journaldev.concurrency;
 2
 3public class Message {
 4    private String msg;
 5
 6    public Message(String str){
 7        this.msg=str;
 8    }
 9
10    public String getMsg() {
11        return msg;
12    }
13
14}

Java BlockingQueue 例子 - 制作人

生产者类,将创建消息,并将其放在队列中。

 1package com.journaldev.concurrency;
 2
 3import java.util.concurrent.BlockingQueue;
 4
 5public class Producer implements Runnable {
 6
 7    private BlockingQueue<Message> queue;
 8
 9    public Producer(BlockingQueue<Message> q){
10        this.queue=q;
11    }
12    @Override
13    public void run() {
14        //produce messages
15        for(int i=0; i<100; i++){
16            Message msg = new Message(""+i);
17            try {
18                Thread.sleep(i);
19                queue.put(msg);
20                System.out.println("Produced "+msg.getMsg());
21            } catch (InterruptedException e) {
22                e.printStackTrace();
23            }
24        }
25        //adding exit message
26        Message msg = new Message("exit");
27        try {
28            queue.put(msg);
29        } catch (InterruptedException e) {
30            e.printStackTrace();
31        }
32    }
33
34}

Java BlockingQueue 例子 - 消费者

将从排队处理消息的消费类,并在收到退出消息时终止。

 1package com.journaldev.concurrency;
 2
 3import java.util.concurrent.BlockingQueue;
 4
 5public class Consumer implements Runnable{
 6
 7private BlockingQueue<Message> queue;
 8
 9    public Consumer(BlockingQueue<Message> q){
10        this.queue=q;
11    }
12
13    @Override
14    public void run() {
15        try{
16            Message msg;
17            //consuming messages until exit message is received
18            while((msg = queue.take()).getMsg() !="exit"){
19            Thread.sleep(10);
20            System.out.println("Consumed "+msg.getMsg());
21            }
22        }catch(InterruptedException e) {
23            e.printStackTrace();
24        }
25    }
26}

Java BlockingQueue 示例 - 服务

最后,我们必须为生产者和消费者创建BlockingQueue服务,这个生产者消费者服务将创建固定尺寸的BlockingQueue,并与生产者和消费者共享。

 1package com.journaldev.concurrency;
 2
 3import java.util.concurrent.ArrayBlockingQueue;
 4import java.util.concurrent.BlockingQueue;
 5
 6public class ProducerConsumerService {
 7
 8    public static void main(String[] args) {
 9        //Creating BlockingQueue of size 10
10        BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
11        Producer producer = new Producer(queue);
12        Consumer consumer = new Consumer(queue);
13        //starting producer to produce messages in queue
14        new Thread(producer).start();
15        //starting consumer to consume messages from queue
16        new Thread(consumer).start();
17        System.out.println("Producer and Consumer has been started");
18    }
19
20}

上面的 Java BlockingQueue 示例程序的输出如下所示。

 1Producer and Consumer has been started
 2Produced 0
 3Produced 1
 4Produced 2
 5Produced 3
 6Produced 4
 7Consumed 0
 8Produced 5
 9Consumed 1
10Produced 6
11Produced 7
12Consumed 2
13Produced 8
14...

[Java Thread Sleep](/community/tutorials/thread-sleep-java Java Thread Sleep Example)在生产者和消费者中被用来产生和消耗一些延迟的消息。

Published At
Categories with 技术
Tagged with
comments powered by Disqus