MelonBlog

java实现一个发布订阅模式

用java模拟一个消息队列(发布订阅模式)

思路

常见的消息队列有4个元素

topic
consumer
producer
broker

topic作为一个主题来关联producer和consumer

consumer用来消费消息

producer用来产生消息

broker作为一个协调器来协调整个队列的运作

实现

topic

topic需要存出一个消费者集合和消息缓存

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Topic {
    private String name;
    private List<Consumer> consumerList;
    private List<Object> messageList;
}

consumer接口

所有的消费者实现类都要实现这个接口

public interface Consumer {
    void receiveMessage(Object message);
}

broker

broker需要实现创建topic、协助consumer订阅topic、协助producer发布消息、协助consumer消费消息等操作

@Slf4j
@Component
public class PubSubManager {
    private final Map<String, Topic> topicMap;
    private final ExecutorService producerThreadPool;
    private final ExecutorService consumerThreadPool;
    private PubSubManager() {
        topicMap = new ConcurrentHashMap<>();
        producerThreadPool = Executors.newFixedThreadPool(5);
        consumerThreadPool = Executors.newFixedThreadPool(5);
    }
    private static class PubSubManagerHolder {
        private static final PubSubManager INSTANCE = new PubSubManager();
    }
    public static PubSubManager getInstance() {
        return PubSubManagerHolder.INSTANCE;
    }
    /**
     * 创建主题
     */
    void createTopic(String topicName) {
        Topic topic = Topic.builder().name(topicName).messageList(new ArrayList<>()).consumerList(new ArrayList<>()).build();
        topicMap.put(topicName, topic);
    }
    /**
     * 订阅主题
     */
    void subscribe(String topicName, Consumer consumer) {
        Topic topic = topicMap.get(topicName);
        if (topic == null) {
            throw new RuntimeException("topic not exist");
        }
        topic.getConsumerList().add(consumer);
    }
    /**
     * 取消订阅
     *
     * @author yehao
     */
    void unsubscribe(String topicName, Consumer consumer) {
        Topic topic = topicMap.get(topicName);
        if (topic == null) {
            throw new RuntimeException("topic not exist");
        }
        topic.getConsumerList().remove(consumer);
    }
    /**
     * 生产消息
     */
    public void produce(String topicName, Object message) {
        Topic topic = topicMap.get(topicName);
        if (topic == null) {
            throw new RuntimeException("topic not exist");
        }
        producerThreadPool.submit(() -> consume(topic, message));
    }
    /**
     * 消费消息
     */
    private void consume(Topic topic, Object message) {
        topic.getMessageList().add(message);
        List<Future<?>> futureList = new ArrayList<>();
        for (Consumer consumer : topic.getConsumerList()) {
            futureList.add(consumerThreadPool.submit(() -> consumer.receiveMessage(message)));
        }
        for (Future<?> future : futureList) {
            try {
                future.get();
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
        topic.getMessageList().remove(message);
    }
}

测试

实现一个测试consumer

public class TestConsumer implements Consumer {
    @Override
    public void receiveMessage(Object message) {
        System.out.printf("TestConsumer[%s] receive message: %s%n", hashCode(), message);
    }
}


将主线程作为producer,每秒产生一条消息

public class Test {
    public static void main(String[] args) {
        PubSubManager pubSubManager = PubSubManager.getInstance();
        pubSubManager.createTopic("test");
        pubSubManager.subscribe("test", new TestConsumer());
        pubSubManager.subscribe("test", new TestConsumer());
        pubSubManager.subscribe("test", new TestConsumer());
        while (true) {
            pubSubManager.produce("test", "hello");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


控制台消息:

TestConsumer[350340640] receive message: hello
TestConsumer[727802134] receive message: hello
TestConsumer[1370913204] receive message: hello
TestConsumer[1370913204] receive message: hello
TestConsumer[350340640] receive message: hello
TestConsumer[727802134] receive message: hello
TestConsumer[1370913204] receive message: hello
TestConsumer[727802134] receive message: hello
TestConsumer[350340640] receive message: hello
TestConsumer[1370913204] receive message: hello
TestConsumer[727802134] receive message: hello
TestConsumer[350340640] receive message: hello

友情链接


Copyright © 2023 www.tiangua.info 鄂ICP备2022015834号