-
Notifications
You must be signed in to change notification settings - Fork 10
MessageProcessor
MessageProcessor 是接收服务端(Server)分发消息的处理器,可以根据自己的业务消息管道来自定义创建,一个消息处理器(MessageProcessor)绑定一个 直接目标 消息管道或 正则表达式 消息管道。
-
MessageProcessorType#SPECIFIC
@Override public MessageProcessorType processorType() { return MessageProcessorType.SPECIFIC; }
具体绑定指定消息管道名称,基于java8的新特性,
MessageProcessor#processorType方法的默认值就是该类型。 -
MessageProcessorType#REGEX
@Override public MessageProcessorType processorType() { return MessageProcessorType.REGEX; }
正则表达式方式绑定消息管道名称,如:
user.+,则会匹配全部以user.开头的消息管道,那么我们如果存在user.1、user.2两个消息管道,都会由本消息处理器(MessageProcessor)进行消息处理。
一个消息处理器(MessageProcessor)按正常来说是只能绑定处理一个消息管道(MessagePipe),那如果消息处理器的类型是MessageProcessorType#REGEX`,则可以支持处理器处理符合表达式的全部消息管道消息。
MessageProcessorType#SPECIFIC类型:
@Override
public String bindingPipeName() {
return "user";
}MessageProcessorType#REGEX类型:
@Override
public String bindingPipeName() {
return "user.+";
}如果消息处理器(MessageProcessor)的类型为MessageProcessorType#REGEX,那么针对每个符合表达式的消息管道都会创建该消息处理器的代理对象,并通过MessageProcessorManager类进行管理维护以及缓存。
/**
* Get {@link MessageProcessor} instance from {@link #processorMap}
*
* @param pipeName message pipe name
* @return message pipe binding {@link MessageProcessor}
*/
public synchronized MessageProcessor getMessageProcessor(String pipeName) {
MessageProcessor processor = this.regexGetMessageProcessor(pipeName);
if (ObjectUtils.isEmpty(processor)) {
throw new MessagePipeException("Message pipeline: " + pipeName + ", there is no bound MessageProcessor.");
}
// get message processor proxy instance
if (MessageProcessorType.REGEX == processor.processorType() && !this.processorMap.containsKey(pipeName)) {
MessageProcessor proxyProcessor = MessageProcessorProxy.getProxy(processor.getClass());
this.processorMap.put(pipeName, proxyProcessor);
return proxyProcessor;
}
return processor;
}在调用MessageProcessorManager#getMessageProcessor获取消息管道绑定的消息处理器时,会根据消息处理器的类型进行判断,如果是正则表达式类型(MessageProcessorType#REGEX)则会进行创建或者从缓存集合中获取消息处理器的代理类(Proxy),代理是通过Spring提供的Cglib方式实现。
客户端(Client)在启动时同样会通过Grpc创建一个服务,而这个服务就是为了接收服务端(Server)分发的消息而创建的。
ReceiveMessageService#messageProcessing方法源码:
@Override
public void messageProcessing(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
MessageResponseBody responseBody = new MessageResponseBody();
try {
String requestJsonBody = request.getBody();
MessageRequestBody requestBody = JsonUtils.jsonToObject(requestJsonBody, MessageRequestBody.class);
String requestId = requestBody.getRequestId();
requestBody.setRequestId(requestId);
String pipeName = requestBody.getPipeName();
Message message = requestBody.getMessage();
// 获取消息管道绑定的MessageProcessor
MessageProcessor processor = messageProcessorManager.getMessageProcessor(pipeName);
// 将消息分配给MessageProcessor并执行处理,并将执行的结果反馈给服务端
boolean result = processor.processing(pipeName, requestId, message);
responseBody.setStatus(result ? MessageResponseStatus.SUCCESS : MessageResponseStatus.ERROR);
} catch (Exception e) {
responseBody.setStatus(MessageResponseStatus.ERROR);
log.error(e.getMessage(), e);
} finally {
String responseJsonBody = JsonUtils.objectToJson(responseBody);
MessageResponse response = MessageResponse.newBuilder().setBody(responseJsonBody).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}在上面方法中,通过MessageProcessorManager#getMessageProcessor方法获取消息管道所绑定的消息处理器(MessageProcessor),消息处理完成后会将结果(result)反馈给服务端,服务端再决定本条消息的删除、重试分发等操作。
- Server
- Client