因为银行客户对Kafka消息队列的接受程度很低,所以需要寻找一种方案替代客户侧落地程序中Kafka。基于Socket的STOMP就是此次调研的备选方案。
1、STOMP传输协议简介
STOMP
中文为: 面向消息的简单文本协议
websocket
定义了两种传输信息类型:文本信息和二进制信息。类型虽然被确定,但是他们的传输体是没有规定的。所以,需要用一种简单的文本传输类型来规定传输内容,它可以作为通讯中的文本传输协议。
STOMP是基于帧的协议,客户端和服务器使用STOMP帧流通讯
一个STOMP客户端是一个可以以两种模式运行的用户代理,可能是同时运行两种模式。
- 作为生产者,通过
SEND
框架将消息发送给服务器的某个服务 - 作为消费者,通过
SUBSCRIBE
制定一个目标服务,通过MESSAGE
框架,从服务器接收消息。
2、STOMP服务端
2.1、依赖
</dependencies>
<!--web依赖是为了开放rest api方便测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
2.2、配置
服务端需要实现一个org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer:
package xyz.fanchw.stompdemo.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* @author fcw
* @since 0.1.0
*/
@Configuration
// 该注解开启STOMP
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
public static final String JS_END_POINT = "stomp-kafka";
public static final String NORMAL_END_POINT = "ok-https";
public static final String BROKER_PREFIX = "/kafka";
private static final long HEART_BEAT_MILLS = 5000L;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 此处是配置js客户端使用的端口
registry.addEndpoint(JS_END_POINT).setAllowedOrigins("*").withSockJS();
// 此处是配置普通客户端使用的端口
registry.addEndpoint(NORMAL_END_POINT);
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 服务端发送心跳用的线程池
final ThreadPoolTaskScheduler heartBeatPool = new ThreadPoolTaskScheduler();
heartBeatPool.setPoolSize(1);
heartBeatPool.setThreadNamePrefix("stomp-heart-beat-");
heartBeatPool.initialize();
registry.enableSimpleBroker(BROKER_PREFIX)
.setHeartbeatValue(new long[]{HEART_BEAT_MILLS, HEART_BEAT_MILLS})
.setTaskScheduler(heartBeatPool)
;
// 客户端向服务端发送消息时需要带上的全局前缀,类似于server.servlet.context-path
registry.setApplicationDestinationPrefixes("/server");
}
}
2.3、服务端的使用
接下来实现一个controller:
package xyz.fanchw.stompdemo.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.web.bind.annotation.RestController;
import xyz.fanchw.stompdemo.entity.Article;
/**
* @author fcw
* @since 0.1.0
*/
@Slf4j
@RestController
// 类似于@RequestMapping 客户端发送消息的路由
@MessageMapping("/hello")
public class StompController {
// 该对象可以往指定主题发送消息
private final SimpMessagingTemplate simpMessagingTemplate;
public StompController(SimpMessagingTemplate simpMessagingTemplate) {
this.simpMessagingTemplate = simpMessagingTemplate;
}
@MessageMapping("/world")
// 该注解表明方法返回的对象,最终会发送至哪个主题
@SendTo("/kafka/article")
public Article stompServer(@Payload Article article) {
log.info("Server accept article: {}", article);
final String content = article.getContent();
// 使用SimpMessagingTemplate也可以往指定主题发送消息
// this.simpMessagingTemplate.convertAndSend("/kafka/article", article);
article.setContent(content + "---serverBack");
return article;
}
}
定义了一个Article类承载数据:
package xyz.fanchw.stompdemo.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Article implements Serializable {
private String author;
private String content;
}
至此,服务端的配置到此结束,注意STOMP使用的端口就是server.port端口,无需单独配置,也不会影响HTTP的使用。
3、Java客户端之JS版本
Java客户端依旧是基于SpringBoot构建,但是是通过模拟js的方式进行连接,对应服务端的这个配置:
registry.addEndpoint(JS_END_POINT).setAllowedOrigins("*").withSockJS();
3.1、依赖
客户端依赖于服务端相同
3.2、配置
首先是一个配置用数据类:
package xyz.fanchw.stompdemo.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author fcw
* @since 0.1.0
*/
@Data
@Component
@ConfigurationProperties(prefix = "stomp")
public class StompConfigEntity {
// 客户端订阅的主题
private List<String> topics;
// 服务端地址
private String url;
}
对应配置文件:
server:
port: 3002
stomp:
# 注意此处的地址前缀直接使用http即可,后面要跟上服务端配置的JS_END_POINT即"stomp-kafka"
url: 'http://127.0.0.1:3001/stomp-kafka'
topics: '/kafka/article'
配置类:
package xyz.fanchw.stompdemo.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec;
import java.util.ArrayList;
import java.util.List;
/**
* @author fcw
* @since 0.1.0
*/
@Slf4j
@Configuration
// 该注解开启WebSocket功能,用于客户端和服务端连接。
@EnableWebSocket
public class StompClientConfig {
@Bean
public WebSocketStompClient webSocketStompClient(ObjectMapper objectMapper) {
// 心跳线程池
final ThreadPoolTaskScheduler heartBeatPool = new ThreadPoolTaskScheduler();
heartBeatPool.setPoolSize(1);
heartBeatPool.setThreadNamePrefix("stomp-heart-beat-");
heartBeatPool.initialize();
List<Transport> transportList = new ArrayList<>(1);
transportList.add(new WebSocketTransport(new StandardWebSocketClient()));
// 注意此处,最终还是模拟js与服务端交互
final SockJsClient sockJsClient = new SockJsClient(transportList);
sockJsClient.setConnectTimeoutScheduler(heartBeatPool);
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
// 使用Jackson序列化/反序列化消息
final MappingJackson2MessageConverter jackson2MessageConverter = new MappingJackson2MessageConverter();
jackson2MessageConverter.setObjectMapper(objectMapper);
stompClient.setTaskScheduler(heartBeatPool);
stompClient.setMessageConverter(jackson2MessageConverter);
stompClient.setDefaultHeartbeat(new long[]{5000L, 5000L});
stompClient.start();
return stompClient;
}
}
这里构建了一个WebSocketStompClient,但是这个客户端并不能直接使用,需要和服务端建立会话后,生成一个StompSession对象。重点观察以下代码中的doConnect()方法:
package xyz.fanchw.stompdemo.hook;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import xyz.fanchw.stompdemo.config.StompConfigEntity;
import xyz.fanchw.stompdemo.handler.ArticleStompHandler;
import javax.annotation.PostConstruct;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author fcw
* @since 0.1.0
*/
@Slf4j
@Component
@EnableScheduling
public class SessionKeeper {
private final WebSocketStompClient stompClient;
private final StompConfigEntity stompConfigEntity;
private final ArticleStompHandler articleStompHandler;
/*———————————————————————————————————————————————————重连并发相关属性———————————————————————————————————————————————————*/
private volatile StompSession stompSession;
private final AtomicLong sessionVersion = new AtomicLong(0L);
private volatile boolean inConnect = false;
private static final long KEEP_PERIOD = 3000L;
private volatile long lastConnectMills;
private final AtomicInteger connectTimeShiftLeft = new AtomicInteger(0);
private final AtomicInteger recentConnectCount = new AtomicInteger(0);
private static final int MAX_RETRY_COUNT = 5;
public SessionKeeper(WebSocketStompClient stompClient,
StompConfigEntity stompConfigEntity,
ArticleStompHandler articleStompHandler) {
this.stompClient = stompClient;
this.stompConfigEntity = stompConfigEntity;
this.articleStompHandler = articleStompHandler;
}
@PostConstruct
public void init() {
tryToCreateStompSession();
}
@Scheduled(fixedRate = KEEP_PERIOD)
public void keepSession() {
if (null != this.stompSession && this.stompSession.isConnected()) {
return;
}
tryToCreateStompSession();
}
public void tryToCreateStompSession() {
final long version = this.sessionVersion.get();
if (this.inConnect) {
return;
}
// inConnect设为true,上面的代码可以获取cas后的version但无法进入此处,直至inConnect设置为false,此时已经完成尝试连接。
// 这样可以避免前面一个线程cas完成后,后面有线程拿到cas后的version继续尝试cas导致的线程不安全
this.inConnect = true;
if (!checkConnectTime()) {
this.inConnect = false;
return;
}
final long newVersion = version + 1;
if (!this.sessionVersion.compareAndSet(version, newVersion)) {
return;
}
// 以下是线程安全的代码
log.info("stomp version: {},lastConnectMills: {},recentConnectCount :{}", newVersion, this.lastConnectMills, this.connectTimeShiftLeft);
doConnect();
}
private void doConnect() {
if (null != this.stompSession && this.stompSession.isConnected()) {
afterConnected();
return;
}
try {
// 这里headers是KV对的集合,类似于HTTP中的header
final StompHeaders stompHeaders = new StompHeaders();
stompHeaders.add("Authorization", "admin");
URI uri = URI.create(this.stompConfigEntity.getUrl());
// connect有多个重载方法,这里除了服务端地址和heanders外,还要传入一个处理Session的Handler。
final StompSession stompSession = this.stompClient
.connect(uri, null, stompHeaders, articleStompHandler)
.get();
// 建立会话后订阅topic
for (String topic : this.stompConfigEntity.getTopics()) {
// 每个topic对应一个FrameHandler
stompSession.subscribe(topic, this.articleStompHandler);
}
this.stompSession = stompSession;
afterConnected();
} catch (Exception e) {
afterConnectException();
log.warn("Failed to tryToCreateStompSession.url : {} ,exception: {}!", this.stompConfigEntity.getUrl(), e.getMessage());
}
}
private boolean checkConnectTime() {
final int count = Math.min(this.connectTimeShiftLeft.get(), MAX_RETRY_COUNT);
long interval = KEEP_PERIOD << count;
return System.currentTimeMillis() - this.lastConnectMills > interval;
}
private void updateConnectTime() {
this.inConnect = false;
this.lastConnectMills = System.currentTimeMillis();
}
private void afterConnected() {
updateConnectTime();
this.connectTimeShiftLeft.set(0);
this.recentConnectCount.set(0);
}
private void afterConnectException() {
this.connectTimeShiftLeft.incrementAndGet();
this.recentConnectCount.incrementAndGet();
updateConnectTime();
}
/*———————————————————————————————————————————————————获取会话———————————————————————————————————————————————————*/
public StompSession getStompSession() {
return this.stompSession;
}
public StompSession getStompSessionSoon() {
int count = 0;
while ((null == this.stompSession || !this.stompSession.isConnected()) && count < MAX_RETRY_COUNT) {
this.connectTimeShiftLeft.set(0);
tryToCreateStompSession();
count++;
}
return this.stompSession;
}
public StompSession getStompSessionWithReTry() {
return getStompSessionWithReTry(MAX_RETRY_COUNT);
}
public StompSession getStompSessionWithReTry(int retry) {
int count = 0;
while ((null == this.stompSession || !this.stompSession.isConnected()) && count < retry) {
tryToCreateStompSession();
count++;
}
return this.stompSession;
}
}
接下来补充之前说到Session和Frame处理:
package xyz.fanchw.stompdemo.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.stereotype.Component;
import xyz.fanchw.stompdemo.entity.Article;
import java.lang.reflect.Type;
/**
* @author fcw
* @since 0.1.0
*/
@Slf4j
@Component
public class ArticleStompHandler extends StompSessionHandlerAdapter {
// 该方法用于返回处理消息中Payload的类型
@Override
public Type getPayloadType(StompHeaders headers) {
return Article.class;
}
// 这里用来处理消息中的Payload,例如落库、转换或者再次发送至其它消息队列中,示例中仅做日志输出
@Override
public void handleFrame(StompHeaders headers, Object payload) {
Article article = (Article) payload;
log.info("payload: {}", article);
}
// 上面是FrameHandler定义的接口,下面是SessionHandler额外定义的接口,不过注意:SessionHandler本身就是
// FrameHandler的子接口。
// 该方法是在如名称所示,在连接建立后的操作,订阅topic动作也可以改到此处执行。
// 不过考虑到创建会话中已经注入了配置数据类,直接订阅主题,此处就无需额外注入一个对象。
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
log.info("StompHeaders: {}", connectedHeaders);
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
super.handleException(session, command, headers, payload, exception);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
super.handleTransportError(session, exception);
}
}
3.3、使用
客户端的使用非常简单,首先建立会话订阅主题后,会自动消费订阅后的消息。注意这里是订阅后的,历史消息目前还未找到原生支持,而消息的处理就在Handler中进行。而客户端向服务端发消息则:
package xyz.fanchw.stompdemo.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSession.Receiptable;
import org.springframework.stereotype.Service;
import xyz.fanchw.stompdemo.hook.SessionKeeper;
import xyz.fanchw.stompdemo.service.StompService;
/**
* @author fcw
* @since 0.1.0
*/
@Slf4j
@Service
public class StompServiceImpl implements StompService {
private final SessionKeeper sessionKeeper;
public StompServiceImpl(SessionKeeper sessionKeeper) {
this.sessionKeeper = sessionKeeper;
}
@Override
public Receiptable send(String destination, Object payload) {
final StompSession stompSession = sessionKeeper.getStompSessionWithReTry();
if (null == stompSession || !stompSession.isConnected()) {
log.warn("StompSession not connect!");
return null;
}
// 这里指定路由和payload直接发送即可。
// 根据之前服务端的配置,这里destination可以填写:"/server/hello/world"
// 首先/server是服务端全局的前缀,类似于server.servlet.context-path
// 其次/hello/world则分别是类和方法上的@MessageMapping
return stompSession.send(destination, payload);
}
}
4、OkHttps客户端
该客户端是封装了OkHttp类库,直接使用Java而不是模拟JS进行网络连接。
4.1、依赖
OkHttps不依赖Spring环境,可以单独使用,但是为了统一调研环境,所以也是在SpringBoot中进行测试:
</dependencies>
<!--web依赖是为了开放rest api方便测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--OKHttps客户端核心依赖-->
<dependency>
<groupId>com.ejlchina</groupId>
<artifactId>okhttps-jackson</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>com.ejlchina</groupId>
<artifactId>okhttps-stomp</artifactId>
<version>3.1.5</version>
</dependency>
</dependencies>
4.2、配置
package xyz.fanchw.stompdemo.config;
import com.ejlchina.okhttps.OkHttps;
import com.ejlchina.stomp.Stomp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import java.util.List;
/**
* @author fcw
* @since 0.1.0
*/
@Slf4j
@Configuration
@EnableWebSocket
public class OkHttpsClientConfig {
@Bean
public Stomp okHttps(StompConfigEntity stompConfigEntity) {
// 创建一个Stomp实例
final Stomp stomp = Stomp.over(OkHttps.webSocket(stompConfigEntity.getUrl()).heatbeat(5, 5));
// 设置各种回调
stomp.setOnConnected(data -> {
log.info("Stomp Connected {}", data.getMsgCodec());
unsubscribe(stompConfigEntity.getTopics(), stomp);
subscribe(stompConfigEntity.getTopics(), stomp);
})
.setOnDisconnected(data -> {
log.info("Stomp Disconnected {}", data);
stomp.connect();
})
.setOnError(data -> log.info("Error : {}", data))
.setOnException(data -> {
log.warn("Exception!", data);
});
// 尝试连接服务端
stomp.connect();
return stomp;
}
private void subscribe(List<String> topics, Stomp stomp) {
for (String topic : topics) {
try {
// 订阅topic的同时传入handler
stomp.subscribe(topic, null, message -> log.info("Subscribe Handler : {}", message.getPayload()));
} catch (Exception e) {
log.warn("Subscribe failed!", e);
}
}
}
private void unsubscribe(List<String> topics, Stomp stomp) {
for (String topic : topics) {
try {
stomp.unsubscribe(topic);
} catch (Exception e) {
log.warn("Unsubscribe failed!", e);
}
}
}
}
OkHttps创建的Stomp实例封装了客户端和会话,所以无需单独创建会话,连接后可以直接与服务端交互。在订阅topic的时候,每个topic都需要传入对应的handler,这个handler在Stomp中被称作消息回调OnCallback。Stomp实例接收服务端的消息被封装成Message实例:
package com.ejlchina.stomp;
import java.util.List;
public class Message {
private final String command;
private final List<Header> headers;
// 由此可见消息的payload是字符串,一般为json格式。
private final String payload;
public Message(String command, List<Header> headers) {
this(command, headers, null);
}
public Message(String command, List<Header> headers, String payload) {
this.command = command;
this.headers = headers;
this.payload = payload;
}
public List<Header> getHeaders() {
return headers;
}
public String getPayload() {
return payload;
}
public String getCommand() {
return command;
}
public String headerValue(String key) {
Header header = header(key);
if (header != null) {
return header.getValue();
}
return null;
}
public Header header(String key) {
if (headers != null) {
for (Header header : headers) {
if (header.getKey().equals(key)) return header;
}
}
return null;
}
@Override
public String toString() {
return "Message {command='" + command + "', headers=" + headers +", payload='" + payload + "'}";
}
}
4.3、使用
Stomp接收服务端消息,直接调用对应OnCallback处理,而向服务端发送消息的核心方法为:
/**
* 发送消息到指定目的地
* @param destination 目的地
* @param data 消息
*/
public void sendTo(String destination, String data) {
send(new Message(Commands.SEND, Collections.singletonList(new Header(Header.DESTINATION, destination)), data));
}
/**
* 发送消息给服务器
* @param message 消息
*/
public void send(Message message) {
WebSocket ws = websocket;
if (ws == null) {
throw new IllegalArgumentException("You must call connect before send");
}
ws.send(msgCodec.encode(message));
}
最后会封装成Message对象,上面已经介绍过该对象。
5、总结
5.1、服务端
服务端的使用配置都较为简单,没有遇到什么特别的问题。不过依然存在以下问题:
- 服务端没有办法记录历史消息,也就不能记录消息的offset。
- 服务端本身无法集群高可用,不过可以利用SpringCloud实现。
- 基于第二点,多个STOMP服务端转发Kafka消息,如何配置消费组以及消费哪个partition是一个问题。
- 对客户端的权限管控、流量控制和状态记录等一系列问题。
5.2、客户端
客户端目前有两种,模拟JS的客户端(以下简称原生客户端)和OkHttps客户端,它们都有一个相同的问题,那就是无法自动重连。所以针对原生客户端做了一个简单SessionKeeper的重连实现,经过测试后基本可用,而对OkHttps没有实现。这是因为OkHttps客户端还有一个严重的问题,它的onDisconnected()回调函数必须在四次挥手结束,连接彻底断开后触发。但是经过多次尝试,关闭服务端之后一直不会触发,与项目核心成员交流后,认为可能是系统网络设置问题,目前实际原因尚不清晰。