阻塞队列

一. 概念

阻塞队列是⼀种特殊的队列.也遵守"先进先出"的原则.

阻塞队列能是⼀种线程安全的数据结构,并且具有以下特性:

  • 当队列满的时候,继续⼊队列就会阻塞,直到有其他线程从队列中取⾛元素.
  • 当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插⼊元素

阻塞队列的⼀个典型应⽤场景就是"生产者消费者模型".这是⼀种⾮常典型的开发模型
生产者

消费者模式就是通过⼀个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以生产者生产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,⽽是直接从阻塞队列⾥取.

生产者消费者模型的实现一共有两大步骤:

  1. 实现生产者
  2. 实现消费者

二. 标准库中的阻塞队列

在Java标准库中内置了阻塞队列.如果我们需要在⼀些程序中使⽤阻塞队列,直接使⽤标准库中的即
可.

  • BlockingQueue是⼀个接⼝.真正实现的类是LinkedBlockingQueue
  • put⽅法⽤于阻塞式的⼊队列,take⽤于阻塞式的出队列
  • BlockingQueue也有offer,poll,peek等⽅法,但是这些⽅法不带有阻塞特性

阻塞队列的伪代码如下:

BlockingQueue<String> queue = new LinkedBlockingQueue<>();

// ⼊队列
queue.put("abc");

// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();

三. 生产者消费者模型

public static void main(String[] args) throws InterruptedException {

	BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
	
	Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println("消费元素: " + value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");
	
customer.start();

	Thread producer = new Thread(() -> {
		Random random = new Random();
		while (true) {
			try {
				int num = random.nextInt(1000);
				System.out.println("生产元素: " + num);
				blockingQueue.put(num);
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "生产者");
	
	producer.start();
	customer.join();
	producer.join();
}

四. 阻塞队列实现

  1. 通过"循环队列"的⽅式来实现.
  2. 使⽤synchronized进⾏加锁控制.
  3. put插⼊元素的时候,判定如果队列满了,就进⾏wait.(注意,要在循环中进⾏wait.被唤醒时不⼀定,队列就不满了,因为同时可能是唤醒了多个线程).
  4. take取出元素的时候,判定如果队列为空,就进⾏wait.(也是循环wait)
public class BlockingQueue {

	private int[] items = new int[1000];
	private volatile int size = 0;
	private volatile int head = 0;
	private volatile int tail = 0;
	
	public void put(int value) throws InterruptedException {
		synchronized (this) {
		// 此处最好使⽤ while.
		// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
		// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能⼜已经队列满了
		// 就只能继续等待
			while (size == items.length) {
				wait();
			}
			items[tail] = value;
			tail = (tail + 1) % items.length;
			size++;
			notifyAll();
		}
	}
	
	public int take() throws InterruptedException {
		int ret = 0;
		synchronized (this) {
			while (size == 0) {
				wait();
			}
			ret = items[head];
			head = (head + 1) % items.length;
			size--;
			notifyAll();
		}
		return ret;
	}
	
	public synchronized int size() {
		return size;
	}
	
	// 测试代码
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue blockingQueue = new BlockingQueue();
		Thread customer = new Thread(() -> {
		while (true) {
			try {
				int value = blockingQueue.take();
				System.out.println(value);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}, "消费者");	
	
		customer.start();
		Thread producer = new Thread(() -> {
			Random random = new Random();
				while (true) {					try {
						blockingQueue.put(random.nextInt(10000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}, "生产者");
			
		producer.start();
		customer.join();
		producer.join();
	}
}

总结

  1. 阻塞队列就相当于⼀个缓冲区,平衡了生产者和消费者的处理能⼒.(削峰填⾕)
  2. 阻塞队列也能使生产者和消费者之间解耦.

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部