java实现一个发布订阅模式
用java模拟一个消息队列(发布订阅模式)
思路
常见的消息队列有4个元素
topic
consumer
producer
broker
topic作为一个主题来关联producer和consumer
consumer用来消费消息
producer用来产生消息
broker作为一个协调器来协调整个队列的运作
实现
topic
topic需要存出一个消费者集合和消息缓存
@Builder@AllArgsConstructor@NoArgsConstructor@Datapublic class Topic { private String name; private List<Consumer> consumerList; private List<Object> messageList;}consumer接口
所有的消费者实现类都要实现这个接口
public interface Consumer { void receiveMessage(Object message);}broker
broker需要实现创建topic、协助consumer订阅topic、协助producer发布消息、协助consumer消费消息等操作
@Slf4j@Componentpublic class PubSubManager { private final Map<String, Topic> topicMap; private final ExecutorService producerThreadPool; private final ExecutorService consumerThreadPool; private PubSubManager() { topicMap = new ConcurrentHashMap<>(); producerThreadPool = Executors.newFixedThreadPool(5); consumerThreadPool = Executors.newFixedThreadPool(5); } private static class PubSubManagerHolder { private static final PubSubManager INSTANCE = new PubSubManager(); } public static PubSubManager getInstance() { return PubSubManagerHolder.INSTANCE; } /** * 创建主题 */ void createTopic(String topicName) { Topic topic = Topic.builder().name(topicName).messageList(new ArrayList<>()).consumerList(new ArrayList<>()).build(); topicMap.put(topicName, topic); } /** * 订阅主题 */ void subscribe(String topicName, Consumer consumer) { Topic topic = topicMap.get(topicName); if (topic == null) { throw new RuntimeException("topic not exist"); } topic.getConsumerList().add(consumer); } /** * 取消订阅 * * @author yehao */ void unsubscribe(String topicName, Consumer consumer) { Topic topic = topicMap.get(topicName); if (topic == null) { throw new RuntimeException("topic not exist"); } topic.getConsumerList().remove(consumer); } /** * 生产消息 */ public void produce(String topicName, Object message) { Topic topic = topicMap.get(topicName); if (topic == null) { throw new RuntimeException("topic not exist"); } producerThreadPool.submit(() -> consume(topic, message)); } /** * 消费消息 */ private void consume(Topic topic, Object message) { topic.getMessageList().add(message); List<Future<?>> futureList = new ArrayList<>(); for (Consumer consumer : topic.getConsumerList()) { futureList.add(consumerThreadPool.submit(() -> consumer.receiveMessage(message))); } for (Future<?> future : futureList) { try { future.get(); } catch (Exception e) { log.error(e.getMessage(), e); } } topic.getMessageList().remove(message); }}测试
实现一个测试consumer
public class TestConsumer implements Consumer { @Override public void receiveMessage(Object message) { System.out.printf("TestConsumer[%s] receive message: %s%n", hashCode(), message); }}将主线程作为producer,每秒产生一条消息
public class Test { public static void main(String[] args) { PubSubManager pubSubManager = PubSubManager.getInstance(); pubSubManager.createTopic("test"); pubSubManager.subscribe("test", new TestConsumer()); pubSubManager.subscribe("test", new TestConsumer()); pubSubManager.subscribe("test", new TestConsumer()); while (true) { pubSubManager.produce("test", "hello"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }}控制台消息:
TestConsumer[350340640] receive message: helloTestConsumer[727802134] receive message: helloTestConsumer[1370913204] receive message: helloTestConsumer[1370913204] receive message: helloTestConsumer[350340640] receive message: helloTestConsumer[727802134] receive message: helloTestConsumer[1370913204] receive message: helloTestConsumer[727802134] receive message: helloTestConsumer[350340640] receive message: helloTestConsumer[1370913204] receive message: helloTestConsumer[727802134] receive message: helloTestConsumer[350340640] receive message: hello