发布者-订阅者实践

subscriber-publisher

Posted by Static on April 29, 2020

参考 apache eventbus,实现订阅者-发布者模式


what?

1. 简介

发布者-订阅者实现了观察者模式,消息通知负责人通过中间商去注册/注销观察者,最后由消息通知负责人给观察者发布消息。

2. 成员

Agent:代理商(发布者),负责注册订阅者,消息的发布,具体逻辑实现类

@Subscribe:标识订阅者

@Prior:在同一类型订阅者中(相同的方法入参类型)的vip

SubscriberMessage:订阅者信息

Report:具体推送的消息


why?

优点:

  1. 解耦了事件的发布者和订阅者,专人做专事
  2. 简化了代码,业务逻辑更清晰
  3. 可定制化属性,比如线程,异步化,同步化

how?

1. 流程图:

2. 使用:

public class Subscriber {

    @Subscribe
    @Prior
    public void toDo(String action){
        System.out.println("I am Subscriber");
    }

}

// 同步推送消息
Agent syncAgent = new Agent();        // 新建代理人
syncAgent.register(new Subscriber()); // 注册订阅者
syncAgent.push(new Report("I come!"));// 发送消息

// 异步推送消息
Agent syncAgent = new Agent(true);        // 新建代理人
syncAgent.register(new Subscriber()); // 注册订阅者
syncAgent.push(new Report("I come!"));// 发送消息

3. 具体代码实现:

  • 注解:
/**
 * Created by wangzhx on 2019/7/14.
 * 同一类型下的优先排序
 */
@Target(value = {ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface Prior {
    String message() default "";

    int rank() default 0;

}

/**
 * Created by wangzhx on 2019/6/25.
 * 订阅者
 */
@Target(value = {ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
}
  • Agent(重点):
/**
 * Created by wangzhx on 2019/7/10.
 * 中介
 * 1. 注册订阅者
 * 2. 给订阅者推消息
 */
public class Agent {
    /**
     * 存储订阅者方法的信息
     * 储存格式:{className:SubscriberMessages}
     * <p>
     * 容器
     */
    private Map<String, List<SubscriberMessage>> container;
    /**
     * vip容器
     */
    private Map<String, List<SubscriberMessage>> priorContainer;
    /**
     * 线程池,可自定义实现
     */
    private ThreadPoolExecutor executor;

    /**
     * 是否异步
     */
    private boolean isAsync;


    public Agent() {
    }

    public Agent(boolean isAsync) {
        this.isAsync = isAsync;
    }

    /**
     * 注册订阅者
     */
    public void register(Object subscriber) {
        if (subscriber == null) {
            return;
        }
        setContainer(subscriber);
    }

    /**
     * 发送消息给订阅者
     */
    public void push(Report report) {
        if (report == null) {
            return;
        }
   
        execute(report.getClazzName(), report);
    }

    /**
     * 若类添加了@Prior,则添加@Subscribe的所有方法都是优先级较高
     * 若方法添加了@Prior,则添加@Subscribe的方法是优先级较高
     */
    private void setContainer(Object subscriber) {
        Class<?> clazz = subscriber.getClass();
        Prior clazzPrior = clazz.getDeclaredAnnotation(Prior.class);
        Method[] declaredMethods = clazz.getDeclaredMethods();
        if (declaredMethods.length != 0) {
            for (Method method : declaredMethods) {
                Subscribe subscribe = method.getAnnotation(Subscribe.class);
                if (subscribe != null) {
                    SubscriberMessage message = SubscriberMessage.builder().
                            methodMessage(method).
                            subscriber(subscriber).
                            clazz(clazz);
                    if (clazzPrior != null) {
                        initContainer(getKey(method), message, clazzPrior);
                    } else {
                        Prior methodPrior = method.getAnnotation(Prior.class);
                        initContainer(getKey(method), message, methodPrior);
                    }
                }
            }
        }
    }

    /**
     * 初始化容器
     */
    protected void initContainer(String key, SubscriberMessage message, Prior prior) {
        if (key == null || message == null) {
            return;
        }
        if (prior != null) {
            if (priorContainer == null) {
                priorContainer = new ConcurrentHashMap<>();
            }
            setMessages(key, message, getMessages(key, priorContainer), priorContainer);
        } else {
            if (container == null) {
                container = new ConcurrentHashMap<>();
            }
            setMessages(key, message, getMessages(key, container), container);
        }
    }

    /**
     * 获取订阅者消息
     */
    private List<SubscriberMessage> getMessages(String key, Map<String, List<SubscriberMessage>> container) {
        if (container == null) {
            container = new ConcurrentHashMap<>();
        }
        return container.get(key);
    }

    private void setMessages(String key,
                             SubscriberMessage message,
                             List<SubscriberMessage> messages,
                             Map<String, List<SubscriberMessage>> container) {
        if (messages != null) {
            messages.add(message);
        } else {
            messages = new ArrayList<>();
            messages.add(message);
            container.put(key, messages);
        }
    }

    /**
     * 初始化线程池
     */
    private void initThreadPool() {
        if (Objects.isNull(executor)) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
            setExecutor(executor);
        }
    }

    /**
     * 获取容器的key(订阅者入参类型)
     */
    private String getKey(Method method) {
        if (method == null || method.getParameterTypes().length == 0) {
            return null;
        }
        return method.getParameterTypes()[0].getSimpleName();
    }

    /**
     * 推送消息,订阅者消费
     */
    private void execute(String containerKey, Report report) {
        // 先推送给vip
        doExecute(containerKey, report, priorContainer);
        // 再推送给普通用户
        doExecute(containerKey, report, container);
    }

    /**
     * 具体执行
     * 1. 若是异步,则初始化线程池执行
     * 2. 若是同步,则同步执行
     */
    private void doExecute(String containerKey, Report report, Map<String, List<SubscriberMessage>> container) {
        if (isAsync) {
            initThreadPool();
            if (checkNullMap(container) && checkNullList(container.get(containerKey))) {
                container.get(containerKey).forEach(message -> executor.execute(
                        InvokeTask.newInstance(() -> message.invoke(report.getContent()))));
            }
        } else {
            if (checkNullMap(container) && checkNullList(container.get(containerKey))) {
                container.get(containerKey).forEach(message -> message.invoke(report.getContent()));
            }
        }
    }

    public boolean checkNullList(List<?> messages) {
        return messages != null && messages.size() != 0;
    }

    public boolean checkNullMap(Map<?, ?> container) {
        return container != null && container.size() != 0;
    }

    public void setExecutor(ThreadPoolExecutor executor) {
        this.executor = executor;
    }

}
  • Report:
/**
 * Created by wangzhx on 2019/7/10.
 * 负责装载消息给订阅者
 */
@AllArgsConstructor
@Data
public class Report {
    private Object content;

    public String getClazzName() {
        if (content != null) {
            return content.getClass().getSimpleName();
        }
        return null;
    }
}
  • SubscriberMessage:
/**
 * 订阅者信息,维度到方法
 */
@Data
public class SubscriberMessage {
    // 订阅者
    private Object subscriber;
    // 阅读者的类
    private Class clazz;
    // 阅读者方法
    private Method method;
    private int parameterCount = 1;
    // 方法入参类型
    private Class<?> parameterType;
    private boolean isPrior;

    private SubscriberMessage() {
    }

    /**
     * 入参暂支持一个
     */
    public void setMethodMessage(Method method) {
        setMethod(method);
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes == null) {
            setParameterCount(0);
            throw new RuntimeException();
        }
        setParameterType(parameterTypes[0]);
    }

    /**
     * 构造者模式
     */
    public static SubscriberMessage builder() {
        return new SubscriberMessage();
    }

    public SubscriberMessage methodMessage(Method method) {
        setMethodMessage(method);
        return this;
    }

    public SubscriberMessage clazz(Class clazz) {
        setClazz(clazz);
        return this;
    }

    public SubscriberMessage subscriber(Object subscriber) {
        setSubscriber(subscriber);
        return this;
    }

    /**
     * 订阅者消费
     */
    void invoke(Object arg) {
        try {
            method.invoke(subscriber, arg);
        } catch (IllegalAccessException | InvocationTargetException e) {
            throw new RuntimeException();
        }
    }

}