一文带你彻底掌握阻塞队列!( 四 )

同样的,把最上面的样例Container中的阻塞队列实现类换成PriorityBlockingQueue,调整如下:
/** * 初始化阻塞队列 */private final BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();生产者插入数据的内容,我们改下插入顺序 。
/** * 生产者 */public class Producer extends Thread {private Container container;public Producer(Container container) {this.container = container;}@Overridepublic void run() {container.add(5);container.add(3);container.add(1);container.add(2);container.add(0);container.add(4);}}最后运行结果如下:
生产者:Thread-0,add:5生产者:Thread-0 , add:3生产者:Thread-0,add:1生产者:Thread-0,add:2生产者:Thread-0 , add:0生产者:Thread-0,add:4消费者:Thread-1 , value:0消费者:Thread-1,value:1消费者:Thread-1 , value:2消费者:Thread-1 , value:3消费者:Thread-1 , value:4消费者:Thread-1,value:5从日志上可以很明显看出 , 对于整数,默认情况下,按照升序排序 , 消费者默认从 0 开始处理 。
3.5、DelayQueueDelayQueue是一个线程安全的延迟队列 , 存入队列的元素不会立刻被消费,只有到了其指定的延迟时间,才能够从队列中出队 。
底层采用的是PriorityQueue来存储元素,DelayQueue的特点在于:插入队列中的数据可以按照自定义的delay时间进行排序,快到期的元素会排列在前面 , 只有delay时间小于 0 的元素才能够被取出 。
部分核心源码如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {/**采用ReentrantLock进行加锁*/private final transient ReentrantLock lock = new ReentrantLock();/**采用PriorityQueue进行存储数据*/private final PriorityQueue<E> q = new PriorityQueue<E>(); /**条件等待与通知*/private final Condition available = lock.newCondition();/**入队操作*/public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}/**出队操作*/public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}}}同样的,把最上面的样例Container中的阻塞队列实现类换成DelayQueue,代码如下:
public class Container {/*** 初始化阻塞队列*/private final BlockingQueue<DelayedUser> queue = new DelayQueue<DelayedUser>();/*** 添加数据到阻塞队列* @param value*/public void add(DelayedUser value) {try {queue.put(value);System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);} catch (InterruptedException e) {e.printStackTrace();}}/*** 从阻塞队列获取数据*/public void get() {try {DelayedUser value = https://www.isolves.com/it/cxkf/jiagou/2023-12-15/queue.take();String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.out.println(time + " 消费者:"+ Thread.currentThread().getName()+",value:" + value);} catch (InterruptedException e) {e.printStackTrace();}}}DelayQueue队列中的元素需要显式实现Delayed接口 , 定义一个DelayedUser类,代码如下:
public class DelayedUser implements Delayed {/*** 当前时间戳*/private long start;/*** 延迟时间(单位:毫秒)*/private long delayedTime;/*** 名称*/private String name;public DelayedUser(long delayedTime, String name) {this.start = System.currentTimeMillis();this.delayedTime = delayedTime;this.name = name;}@Overridepublic long getDelay(TimeUnit unit) {// 获取当前延迟的时间long diffTime = (start + delayedTime) - System.currentTimeMillis();return unit.convert(diffTime,TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {// 判断当前对象的延迟时间是否大于目标对象的延迟时间return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {return "DelayedUser{" +"delayedTime=" + delayedTime +", name='" + name + ''' +'}';}}生产者插入数据的内容,做如下调整 。
/** * 生产者 */public class Producer extends Thread {private Container container;public Producer(Container container) {this.container = container;}@Overridepublic void run() {for (int i = 0; i < 6; i++) {container.add(new DelayedUser(1000 * i, "张三" +i));}}}最后运行结果如下:
生产者:Thread-0,add:DelayedUser{delayedTime=0, name='张三0'}生产者:Thread-0,add:DelayedUser{delayedTime=1000, name='张三1'}生产者:Thread-0,add:DelayedUser{delayedTime=2000, name='张三2'}生产者:Thread-0,add:DelayedUser{delayedTime=3000, name='张三3'}生产者:Thread-0,add:DelayedUser{delayedTime=4000, name='张三4'}生产者:Thread-0,add:DelayedUser{delayedTime=5000, name='张三5'}2023-11-03 14:55:33 消费者:Thread-1,value:DelayedUser{delayedTime=0, name='张三0'}2023-11-03 14:55:34 消费者:Thread-1 , value:DelayedUser{delayedTime=1000, name='张三1'}2023-11-03 14:55:35 消费者:Thread-1,value:DelayedUser{delayedTime=2000, name='张三2'}2023-11-03 14:55:36 消费者:Thread-1,value:DelayedUser{delayedTime=3000, name='张三3'}2023-11-03 14:55:37 消费者:Thread-1,value:DelayedUser{delayedTime=4000, name='张三4'}2023-11-03 14:55:38 消费者:Thread-1,value:DelayedUser{delayedTime=5000, name='张三5'}


推荐阅读