spring-boot支持websocket

spring-boot本身对websocket提供了很好的支持,可以直接原生支持sockjs和stomp协议。百度搜了一些中文文档,虽然也能实现websocket,但是并没有直接使用spring-boot直接支持的websocket的特性。

在实践中觉得stromp协议对于websocket开发的自由度影响比较大。这里给大家展示一种自由度比较大的方案。

主要就是三个组件,config,interceptor和handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@EnableWebSocket
public class MessageWebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(messageWebSocketHandler(), "/sockjs/message")
.addInterceptors(new MessageWebSocketInterceptor()).withSockJS();
}
@Bean
public MessageWebSocketHandler messageWebSocketHandler() {
return new MessageWebSocketHandler();
}
}

config需要继承WebSocketConfigurer需要重写registerWebSocketHandlers方法,指明handler和interceptor。

interceptor顾名思义为拦截器我们可以在websocket建立之间和之后做一些事情。重载beforeHandshakeafterHandshake就OK。我在beforeHandshake这里还操作了attributes。被修改的attributes会被带到后面websocket的session之中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MessageWebSocketInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
String siteId = servletRequest.getServletRequest().getParameter("siteId");
String userId = servletRequest.getServletRequest().getParameter("userId");
if (siteId == null || userId == null) {
return false;
}
attributes.put("siteId", siteId);
attributes.put("userId", userId);
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}

handler里面就可以写websocket的逻辑啦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MessageWebSocketHandler implements WebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) {
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
}
@Override
public boolean supportsPartialMessages() {
return false;
}
}

spring-boot单元测试可以写websocket-client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {Application.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class WebsocketTest {
private final Logger logger = LoggerFactory.getLogger(WebsocketTest.class);
private final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
@LocalServerPort
private int port;
private SockJsClient sockJsClient;
@Before
public void setup() {
List<Transport> transports = new ArrayList<>();
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());
this.sockJsClient = new SockJsClient(transports);
}
@Test
public void getGreeting() throws Exception {
this.sockJsClient.doHandshake(new TestWebSocketHandler(failure),
"ws://localhost:"+String.valueOf(port)+"/sockjs/message?siteId=webtrn&userId=lucy");
if (latch.await(60, TimeUnit.SECONDS)) {
if (failure.get() != null) {
throw new AssertionError("", failure.get());
}
}
else {
fail("Greeting not received");
}
}
private class TestWebSocketHandler implements WebSocketHandler {
private final AtomicReference<Throwable> failure;
TestWebSocketHandler() {
this.failure = null;
}
;
TestWebSocketHandler(AtomicReference<Throwable> failure) {
this.failure = failure;
}
;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.info("client connection established");
session.sendMessage(new TextMessage("hello websocket server!"));
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
String payload = (String) message.getPayload();
logger.info("client handle message: " + payload);
if (payload.equals("hello websocket client! webtrn lucy")) {
latch.countDown();
}
if (payload.equals("web socket notify")) {
latch.countDown();
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
logger.info("client transport error");
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
logger.info("client connection closed");
}
@Override
public boolean supportsPartialMessages() {
return false;
}
}
}

如果采用stomp协议的话可以参考spring-boot的一个ws-guide。有问题还是直接看spring文档比较好。

spring-boot 和 webpack-dev-server联合开发

当前前后端架构分离的模式比较流行,前端用Nodejs或者ngnix等方式发布与渲染网页,后端程序只提供restful的数据接口。但对于一些小项目来说,并不想让前后端如此分离,还是希望用spring-boot的内置tomcat来serve static content。

如果只是用前端工具的话,webpack是一个很好的打包方式,webpack-dev-server给我们提供了很好的在线调试与修改。但是与spring-boot结合起来就不太协调。这时候就可以用到webpack-dev-server的代理模式了。通过webpack-dev-server来代理spring-boot中tomcat的端口(默认8080)

这里贴出我的一个配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// webpack.config.js
var path = require('path');
var webpack = require('webpack');
var HtmlWebpackPlugin = require('html-webpack-plugin');
module.exports = {
devtool: "source-map",
entry: [
"webpack-dev-server/client?http://localhost:3000",
"webpack/hot/only-dev-server",
"./src/main/web/index.js"
],
output: {
path: "./src/main/resources/static",
filename: "index.js",
publicPath: 'http://localhost:3000/'
},
module: {
loaders: [
{test: /\.css$/, loader: "style!css"},
{
test: /\.js$/, loader: "babel-loader",
exclude: /node_modules/,
query: {
presets: ['es2015']
}
},
{ test: /\.(png|jpg|jpeg|gif|woff)$/, loader: 'url-loader?limit=8192' },
{ test: /\.html$/, loader: 'html'},
]
},
plugins: [
new webpack.HotModuleReplacementPlugin(),
new webpack.NoErrorsPlugin(),
new HtmlWebpackPlugin({
template: './src/main/web/index.tmpl'
})],
devServer: {
port: 3000,
proxy: {
'**': {
target: 'http://localhost:8080',
secure: false,
prependPath: false
}
},
publicPath: 'http://localhost:3000/',
historyApiFallback: true
}
};

在这里我们可以看到,通过webpack-dev-server的3000端口去代理8080端口。在package.json中添加

1
2
3
4
"scripts": {
"webpack": "webpack",
"watch": "webpack-dev-server --inline"
},

之后直接启动spring boot程序,然后npm run watch就可以通过访问3000端口来进行前端的热开发了

spring boot直接返回静态html

通常spring boot的一般教程的例子都是通过模板来返回页面,比如thymeleaf或者freemarker,但是直接返回html的例子比较少。本文参考文章SpringBoot : How to display static html file in Spring boot MVC application。说明如何让spring boot直接返回html。

一般来说resources/static或者resources/public文件夹可以用来提供js,css,图片等文件访问。不经过配置,直接返回html会报404错误。提供静态html访问主要需要如下配置(懒得翻译了。。。)

  • You should create a class that extends WebMvcConfigurerAdapter
  • Your class should have @Configuration annotation.
  • You class should not have @EnableMvc annotation.
  • Override addViewControllers method and add your mapping.
  • Override configurePathMatch method and update suffix path matching

其实,添加如下配置类就好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class MvcConfigurer extends WebMvcConfigurerAdapter {
@Override
public void addViewControllers(ViewControllerRegistry registry) {
registry.addViewController("/error").setViewName("error.html");
registry.setOrder(Ordered.HIGHEST_PRECEDENCE);
}
@Override
public void configurePathMatch(PathMatchConfigurer configurer) {
super.configurePathMatch(configurer);
configurer.setUseSuffixPatternMatch(false);
}
}

像@Transactional一样利用注解自定义aop切片

在spring中,利用@Transactional注解可以很轻松的利用aop技术进行事物管理。在实际项目中,直接利用自定义注解实现切片可以大大的提高我们的编码效率以及代码的简洁性。

实现以上的目标,主要涉及两方面工作。

  1. 自定义注解
  2. 将注解声明为切片

自定义注解

介绍注解自定义的文章比较多,这里简要介绍一下以下面的代码为例。该代码要实现一个分布式锁的代码。首先利用@interface来声明该类为接口类,用@Target来声明该注解的作用范围。该例子中ElementType.METHOD, ElementType.TYPE表明该注解作用范围是方法和类。@Retention注明该注解的作用周期。RetentionPolicy.RUNTIME表明该注解在运行时也是有效的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.whaty.lock.annotation;
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Component
@Target(value = {ElementType.METHOD, ElementType.TYPE})
@Retention(value = RetentionPolicy.RUNTIME)
public @interface Lock {
public LockImpl lockImpl() default LockImpl.MYSQL;
public enum LockImpl {
MYSQL, ZOOKEEPER
}
}

将注解声明为切片

下面的代码是实现注解切片逻辑的代码。其中@Aspect用来声明切片的实现。在这个代码里面,最关键的一步是
@Around(value = "@annotation(com.whaty.lock.annotation.Lock)")
这个声明与普通的注解式声明切片类似,只是其中@annotation表明该切片作用范围为声明的注解作用范围。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.whaty.lock.aspect;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
@Component
@Aspect
public class LockAspect {
@Around(value = "@annotation(com.whaty.lock.annotation.Lock)")
void execute(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
// 尝试获取锁
proceedingJoinPoint.proceed();
// 释放锁
}
}

Spring boot中如何定义过滤器Filter

最近刚刚接手使用spring boot,真是一个开发很顺手的工具。在这里总结一下自己发现的基于@Configuration的注解定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package example.hello;
import org.springframework.boot.context.embedded.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class WebConfig {
@Bean
public FilterRegistrationBean greetingFilterRegistrationBean() {
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setName("greeting");
GreetingFilter greetingFilter = new GreetingFilter();
registrationBean.setFilter(greetingFilter);
registrationBean.setOrder(1);
List<String> urlList = new ArrayList<String>();
urlList.add("/abc");
registrationBean.setUrlPatterns(urlList);
return registrationBean;
}
@Bean
public FilterRegistrationBean helloFilterRegistrationBean() {
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setName("hello");
HelloFilter helloFilter = new HelloFilter();
registrationBean.setFilter(helloFilter);
registrationBean.setOrder(2);
return registrationBean;
}
/*
@Bean
@Order(1)
Filter greetingFilter() {
return new GreetingFilter();
}
@Bean
@Order(2)
public Filter helloFilter() {
return new HelloFilter();
}*/
}

其中GreetingFilterHelloFiter是定义的简单的打印字符串过滤器。在@Configuration中,声明注解@Bean相当于在Spring老版本中在配置文件中声明一个Bean。

在这里展示了两种过滤器声明方式,第一种利用FilterRegistrationBean可以详细地更好地详细的定义过滤器。第二种注释掉的,声明方式更简单,代码更加简洁。

在这里也咨询大家一个问题,用第二种方式如何声明UrlPattern呢,貌似没有相关的注解

单数据源访问多数据库的路由开发

在某些可以配置多站点的开发框架中,如果每个站点单独配置了单独的数据库。那么利用单一数据源根据不同的站点切换不同的数据库比较方便。

在这里展示了spring框架下的解决方案。利用了spring的org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource

站点路由的datasource SiteRoutingDataSource

1
2
3
4
5
6
7
8
9
10
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
public class SiteRoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return SiteContextHolder.getSiteCode();
}
}

用来判定当前站点的工具类SiteContextHolder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.springframework.util.Assert;
public class SiteContextHolder {
private static final ThreadLocal<String> contextHolder =
new ThreadLocal<String>();
public static void setSiteCode(String siteCode) {
Assert.notNull(siteCode, "siteCode cannot be null");
contextHolder.set(siteCode);
}
public static String getSiteCode() {
return (String) contextHolder.get();
}
public static void clearSiteCode() {
contextHolder.remove();
}
}

spring的xml配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<bean id="hgc"
class="com.mchange.v2.c3p0.ComboPooledDataSource"
destroy-method="close">
<property name="driverClass">
<value>${hgc.datasource.driverClassName}</value>
</property>
<property name="jdbcUrl">
<value>${hgc.datasource.url}</value>
</property>
<property name="user">
<value>${hgc.datasource.username}</value>
</property>
<property name="password">
<value>${hgc.datasource.password}</value>
</property>
</bean>
<bean id="ahpu"
class="com.mchange.v2.c3p0.ComboPooledDataSource"
destroy-method="close">
<property name="driverClass">
<value>${ahpu.datasource.driverClassName}</value>
</property>
<property name="jdbcUrl">
<value>${ahpu.datasource.url}</value>
</property>
<property name="user">
<value>${ahpu.datasource.username}</value>
</property>
<property name="password">
<value>${ahpu.datasource.password}</value>
</bean>
<bean id="dataSource" class="SiteRoutingDataSource">
<property name="targetDataSources">
<map key-type="java.lang.String">
<entry key="hgc" value-ref="hgc"/>
<entry key="ahpu" value-ref="ahpu"/>
</map>
</property>
<property name="defaultTargetDataSource" ref="ahpu"/>
</bean>

在使用过程中 通过 SiteContextHolder.setSiteCode(CODE);来进行数据源选择。在切换数据库之前,需要先SiteContextHolder.clearSiteCode();再进行切换

spring mvc处理表单

在使用spring mvc时 提交表单遇到了如下问题

表单请求的headers通常有两种content type: application/x-www-form-urlencoded和multipart/form-data。前一种类似于get请求用&连接参数,通常适用于字符串,第二种就通常适用于文件和参数混合的类型。

对于第一种请求参数,Spring mvc的大多数例子都默认支持。

对于第二种Controller的函数输入参数可以采用MultipartHttpServletRequest,并在spring配置文件中添加如下配置

1
2
3
4
5
6
7
<bean id="multipartResolver"
class="org.springframework.web.multipart.commons.CommonsMultipartResolver">
<!-- setting maximum upload size -->
<property name="maxUploadSize" value="100000"/>
</bean>

添加如下maven依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>

限制Java线程池运行线程以及等待线程数量的策略

对于java.util.concurrent.Executors所提供的FixedThreadPool,可以保证可以在内存中有固定数量的线程数运行。但是由于FixedThreadPool绑定的是LinkedBlockingQueue。队列的上限没有限制(默认上限为Integer.MAX_VALUE),不断的提交新的线程,会造成任务在内存中长时间的堆积。

我们有可能面临如下的场景,主线程不断地提交任务线程,希望有固定数量的在线程中运行,也不想造成线程在内存中大量的等待堆积。由此需要我们自己定义一个线程池策略。ThreadPoolExecutor为我们线程池的设置提供了很大的灵活性。

首先看FixedThreadPool的实现:

1
2
3
4
5
6
public static ExecutorService More ...newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

可以看到,FixedThreadPool绑定的是LinkedBlockingQueue<Runnable>。我们需要做的第一个改造就是绑定有大小上线的BlockingQueue,在我的实现中绑定ArrayBlockingQueue<Runnable>并设置了size。

第二个是采用CallerRunsPolicy。ThreadPoolExecutor可以定义不同的任务拒绝策略。CallerRunsPolicy指的是当线程池拒绝该任务的时候,线程在本地线程直接execute。这样就限制了本地线程的循环提交流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<Runnable>(10);
RejectedExecutionHandler rejectedExecutionHandler =
new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService threadPool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
workingQueue, rejectedExecutionHandler);
for (int i = 0; i < 100; i++) {
threadPool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
System.out.println("thread " + String.valueOf(threadNo) + " is called");
Thread.sleep(10000);
System.out.println("thread " + String.valueOf(threadNo) + " is awake");
throw new Exception();
}
});
}

代码中定义了大小为10的线程池,for循环提交了20个线程的时候,10个执行线程,10个线程放入了workingQueue。当提交到第21个线程的时候,会触发RejectedExecutionHandler。在这里我们配置了CallerRunsPolicy策略。所以会在主线程直接执行该线程。也就是说,在本程序中最多会有11个线程在执行,10个线程在等待。由此限制了线程池的等待线程数与执行线程数

cassandra查询效率探讨

cassandra目前提倡的建表与查询方式为CQL方式,传统的cassandra-cli相关的api由于性能问题将逐步淘汰,而cassandra-cli也将在2.2版本之后被淘汰。

在CQL中,可以利用类似SQL的方式建立数据表,例如:

1
2
3
4
5
6
CREATE TABLE monitor (
id bigint,
value text,
num int,
timestamp timestamp,
PRIMARY KEY (id, timestamp ));

其中id与timestamp共同构成了primary key。primary key可以不止一个字段,大于一个字段的可以构成clustering key。其中在primary key中第一个字段为partition key,用来决定row在整个ring中的分布。后面的字段为clustering key,对于同一个partition key所代表的行,是根据clustering key以一定顺序在物理上相邻存储的。所以根据partition key以及clustering key进行联合查询速度会比较快。cassandra对于如下查询效率比较高

1
2
3
select * from monitor WHERE id = 1;
select * from monitor WHERE id = 2 AND timestamp = '2015-12-01 12:00:00+0800';
select * from monitor WHERE id = 2 AND timestamp > '2015-12-01 12:00:00+0800' AND timestamp < '2015-12-01 23:00:00+0800';

但是对于下面的查询,cassandra会返回InvalidRequest: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"

1
select * from monitor WHERE timestamp = '2015-12-01 12:00:00+0800';

其原因为是cassandra认为这查询效率比较低下,需要用户显式地增加ALLOW FILTERING修饰。这种查询过程是先获取所有行,然后在根据timestamp = '2015-12-01 12:00:00+0800'进行过滤,效率自然比较低。

解决的办法通常有在timestamp字段上建立所以。但不能简单地将cassandra建立索引的机制与普通的关系型数据库如mysql划等号。通过primary key查询,可以通过ring的信息很快的定位到具体的节点。但是通过index查询字段的话,cassandra会每个节点进行查询。虽然节点内部也会对本地数据进行索引,但是效率还是远不如直接查询primary key快。此外cassandra并不能够对于timestamp >'2015-12-01 12:00:00+0800'这种范围条件进行查询。所以更好的方式是另外建立一个表,将需要查询的字段作为主键,并存储对应关系。

参考资料

  1. ALLOW FILTERING explained
  2. A deep look at the CQL WHERE clause
  3. When to use an index

cassandra分页

在cassandra的协议中,没有具体规定查询结果的行数限制。但是对于大的数据集,依然有结果分页的必要。过大的结果集会爆掉服务端或者客户端的内存。

传统的分页方法采用了一点trick,采用了token函数

1
2
SELECT * FROM images LIMIT 100;
SELECT * FROM images WHERE token(image_id) > token([Last image ID received]) LIMIT 100;

这种方式会造成一点编程上的麻烦,一般开发中会重新再封装一次分页的方法。在cassandra2.0的java api中,添加了对于分页的支持,如下所示:

1
2
3
4
5
6
7
8
9
Statement stmt = new SimpleStatement("select * FROM raw_weather_data WHERE wsid= '725474:99999' AND year = 2005 AND month = 6");
stmt.setFetchSize(24);
ResultSet rs = session.execute(stmt);
Iterator<Row> iter = rs.iterator();
while (!rs.isFullyFetched()) {
rs.fetchMoreResults();
Row row = iter.next();
System.out.println(row);
}

参考资料

  1. Things You Should Be Doing When Using Cassandra Drivers
  2. Improvements on the driver side with Cassandra 2.0
一个java web程序员,希望自己两年之内能成为data scientist,正在找工作