java自带的延迟队列:DelayQueue
平常我们使用延迟队列时, 都会考虑rabbit MQ或者rocket MQ,那小项目如果我们也想用延迟队列的时候怎么办呢?比如我们一个小项目里也需要针对预约或者订单的过期进行通知,这时候如果有一个延迟队列, 业务实现起来就方便多了。
java还真的自带了一个延迟队列:DelayQueue
DelayQueue
先通过一张图来看看DelayQueue的类结构:

流量控制
DelayQueue实现了BlockingQueue接口并且继承了AbstractQueue抽象类。
BlockingQueue接口是java1.5引入的,它的功能是为队列提供流量控制(FLow Controller)。当一个线程往一个已经满了的BLockingQueue里添加元素时,会被阻塞住,直到其他线程处理完队列里面的数据之后腾出空间了,被阻塞的线程才能成功添加新的元素到队列里。当一个BlockingQueue为空时,如果一个线程执行delete操作,也会被阻塞,直到有新的数据入队。

优先级排序
并且DelayQueue实际存储元素的地方是一个优先级队列,队列元素会根据过期时间的长短进行排序。

也就是说,DelayQueue会保证最早过期的数据最早被消费。
队列元素
根据DelayQueue类的定义,我们可以看到这里用到了一个泛型<E extends Delayed> 。只有泛型E才能被放入队列里,而E是一个实现了Delayed接口的子类。
/** * A mix-in style interface for marking objects that should be * acted upon after a given delay. * * <p>An implementation of this interface must define a * {@code compareTo} method that provides an ordering consistent with * its {@code getDelay} method. * * @since 1.5 * @author Doug Lea */public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit);}所以我们在写业务的时候,需要自定义一个Delayed子类,并且提供一个获取实例对象的延迟时间方法。
例子🌰
自定义延迟消息
public class DelayedMessage implements Delayed { private final long expiredTime; private final Object data; public Object getData() { return data; } public DelayedMessage(long expiredTime, Object data) { this.expiredTime = expiredTime; this.data = data; } @Override public String toString() { return "DelayedMessage{" + "expiredTime=" + expiredTime + ", data=" + data + '}'; } @Override public long getDelay(TimeUnit unit) { long diffTime = expiredTime - System.currentTimeMillis(); return unit.convert(diffTime, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); }}使用DelayQueue
public class DelayQueueDemo { public static void main(String[] args) { DelayQueue<DelayedMessage> delayQueue = new DelayQueue<>(); long baseTime = System.currentTimeMillis(); delayQueue.put(new DelayedMessage(baseTime + 5000, "5s")); delayQueue.put(new DelayedMessage(baseTime + 6000, "6s")); delayQueue.put(new DelayedMessage(baseTime + 3000, "3s")); delayQueue.put(new DelayedMessage(baseTime + 4000, "4s")); delayQueue.put(new DelayedMessage(baseTime + 7000, "7s")); delayQueue.forEach(message -> System.out.println(message)); new Thread(() -> { while (!delayQueue.isEmpty()) { try { DelayedMessage message = delayQueue.take(); System.out.println(message.getData()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("main thread end"); }}控制台:
DelayedMessage{expiredTime=1707362591673, data=3s}DelayedMessage{expiredTime=1707362592673, data=4s}DelayedMessage{expiredTime=1707362593673, data=5s}DelayedMessage{expiredTime=1707362594673, data=6s}DelayedMessage{expiredTime=1707362595673, data=7s}main thread end3s4s5s6s7s