MelonBlog

java自带的延迟队列:DelayQueue

平常我们使用延迟队列时, 都会考虑rabbit MQ或者rocket MQ,那小项目如果我们也想用延迟队列的时候怎么办呢?比如我们一个小项目里也需要针对预约或者订单的过期进行通知,这时候如果有一个延迟队列, 业务实现起来就方便多了。

java还真的自带了一个延迟队列:DelayQueue

DelayQueue

先通过一张图来看看DelayQueue的类结构:

image

流量控制

DelayQueue实现了BlockingQueue接口并且继承了AbstractQueue抽象类。

BlockingQueue接口是java1.5引入的,它的功能是为队列提供流量控制(FLow Controller)。当一个线程往一个已经满了的BLockingQueue里添加元素时,会被阻塞住,直到其他线程处理完队列里面的数据之后腾出空间了,被阻塞的线程才能成功添加新的元素到队列里。当一个BlockingQueue为空时,如果一个线程执行delete操作,也会被阻塞,直到有新的数据入队。

image

优先级排序

并且DelayQueue实际存储元素的地方是一个优先级队列,队列元素会根据过期时间的长短进行排序。

image

也就是说,DelayQueue会保证最早过期的数据最早被消费。


队列元素

根据DelayQueue类的定义,我们可以看到这里用到了一个泛型<E extends Delayed> 。只有泛型E才能被放入队列里,而E是一个实现了Delayed接口的子类。

/**
 * A mix-in style interface for marking objects that should be
 * acted upon after a given delay.
 *
 * <p>An implementation of this interface must define a
 * {@code compareTo} method that provides an ordering consistent with
 * its {@code getDelay} method.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

所以我们在写业务的时候,需要自定义一个Delayed子类,并且提供一个获取实例对象的延迟时间方法。

例子🌰

自定义延迟消息

public class DelayedMessage implements Delayed {
    private final long expiredTime;
    private final Object data;
    public Object getData() {
        return data;
    }
    public DelayedMessage(long expiredTime, Object data) {
        this.expiredTime = expiredTime;
        this.data = data;
    }
    @Override
    public String toString() {
        return "DelayedMessage{" +
                "expiredTime=" + expiredTime +
                ", data=" + data +
                '}';
    }
    @Override
    public long getDelay(TimeUnit unit) {
        long diffTime = expiredTime - System.currentTimeMillis();
        return unit.convert(diffTime, TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }
}

使用DelayQueue

public class DelayQueueDemo {
    public static void main(String[] args) {
        DelayQueue<DelayedMessage> delayQueue = new DelayQueue<>();
        long baseTime = System.currentTimeMillis();
        delayQueue.put(new DelayedMessage(baseTime + 5000, "5s"));
        delayQueue.put(new DelayedMessage(baseTime + 6000, "6s"));
        delayQueue.put(new DelayedMessage(baseTime + 3000, "3s"));
        delayQueue.put(new DelayedMessage(baseTime + 4000, "4s"));
        delayQueue.put(new DelayedMessage(baseTime + 7000, "7s"));
        delayQueue.forEach(message -> System.out.println(message));
        new Thread(() -> {
            while (!delayQueue.isEmpty()) {
                try {
                    DelayedMessage message = delayQueue.take();
                    System.out.println(message.getData());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("main thread end");
    }
}

控制台:

DelayedMessage{expiredTime=1707362591673, data=3s}
DelayedMessage{expiredTime=1707362592673, data=4s}
DelayedMessage{expiredTime=1707362593673, data=5s}
DelayedMessage{expiredTime=1707362594673, data=6s}
DelayedMessage{expiredTime=1707362595673, data=7s}
main thread end
3s
4s
5s
6s
7s