简

人生短暂,学海无边,而大道至简。


  • 首页

  • 归档

  • 分类

  • 标签

Tomcat架构

发表于 2018-09-01 | 分类于 Tomcat

Tomcat历史

Tomcat最初有sun公司的架构师James Duncan Davidson开发,名称“JavaWebServer” 1999与Apache软件基金会旗下的JServ项目合并,也就是Tomcat。2001 tomcat4.0 里程碑式的版本。完全重新设计了其架构,并实现了Servlet2.3和JSP 1.2规范。到目前,Tomcat已经成为成熟的Servlet容器产品,并作为 JBoss等应用的服务器的内嵌Servlet容器。

总体结构(结构演变理解)

1、最简单的结构

Server
接受请求并解析,完成相关任务,返回处理结果通常情况下使用Socket监听服务器指定端口来实现该功能,一个最简单的服务设计如下方法:

  • Start():启动服务器,打开socket连接,监听服务端口,接受客户端请求、处理、返回响应
  • Stop():关闭服务器,释放资源
    缺点:请求监听和请求处理放一起扩展性很差(协议的切换 tomcat独立部署使用HTTP协议,与Apache集成时使用AJP协议)

改进:网络协议与请求处理分离

2、Connector和Container

一个Server包含多个Connector(链接器)和Container(容器)

Connector:开启Socket并监听客户端请求,返回响应数据;Container:负责具体的请求处。

缺点:Connector接受的请求由那个Container处理,需要建立映射规则

改进:一个Server可以包含多个Service,每一个Service都是独立的,他们共享一个JVM以及系统类库。

3、Server包含多个Service

一个Service负责维护多个Connector和一个Container,这样来自Connector的请求只能有它所属的Service维护的Container处理。

在这里Container是一个通用的概念,为了明确功能,并与Tomcat中的组件名称相同,可以将Container命名为Engineer

4、Engine、Context

在Engine容器中需要支持管理WEB应用,当接收到Connector的处理请求时,Engine容器能够找到一个合适的Web应用来处理,因此在上面设计的基础上增加Context来表示一个WEB应用,并且一个Engine可以包含多个Context。

缺点:应用服务器需要将每个域名抽象为一个虚拟主机,

5、Host、Wrapper

通常在一个主机下,提供多个域名的服务,因此引入Host

在一个web应用中,可以包含多个Servlet实例来处理来自不同的链接请求,因此我们还需要一个组件概念来表示Servlet定义,即Wrapper。

20200404205355

在前面的Container容器中,Engine、Host、Context、Wrapper是一类组件,这类组件的作用就是处理接收客户端的请求并且返回响应数据,并且有可能请求到Engine容器,Engine容器委托给子容器Host处理。

20200404210742

使用Container代表容器,Engine、Host、Context、Wrapper都是Container的子容器,Container可以维护子容器。backgroundProcess()方法针对后台处理,并且其基础抽象类(ContainerBase)确保在启动组件的同时,异步启动后台处理。

容器之间的组合关系是一种弱依赖,用虚线表示。

每一个组件都有启动、停止等生命周期方法,拥有生命周期的特征。所以定义一个通用的LifeCycle接口,

20200404210830

20200404205607

架构

连接器Connector

1
2
3
4
监听服务器端口,读取来自客户端的请求
使用指定的协议解析请求数据
根据请求地址匹配正确的容器解析处理
讲响应返回给客户端

连接器需要完成 3 个高内聚的功能:

  • 网络通信。
  • 应用层协议解析。
  • Tomcat Request/Response 与 ServletRequest/ServletResponse 的转化。

因此 Tomcat 的设计者设计了 3 个组件来实现这 3 个功能,分别是 EndPoint、Processor 和 Adapter。

Endpoint 和 Processor 放在一起抽象成了 ProtocolHandler 组件,连接器用 ProtocolHandler 来处理网络连接和应用层协议。

20200404215946

EndPoint 是一个接口,它的抽象实现类 AbstractEndpoint 里面定义了两个内部类:Acceptor 和 SocketProcessor。其中 Acceptor 用于监听 Socket 连接请求。SocketProcessor 用于处理接收到的 Socket 请求。

EndPoint 接收到 Socket 连接后,生成一个 SocketProcessor 任务提交到线程池去处理,SocketProcessor 的 Run 方法会调用 Processor 组件去解析应用层协议,Processor 通过解析生成 Request 对象后,会调用 Adapter 的 Service 方法。

20200404220033

容器

Tomcat 设计了 4 种容器,分别是 Engine、Host、Context 和 Wrapper。这 4 种容器不是平行关系,而是父子关系。

Context 表示一个 Web 应用程序;Wrapper 表示一个 Servlet,一个 Web 应用程序中可能会有多个 Servlet;Host 代表的是一个虚拟主机,或者说一个站点,可以给 Tomcat 配置多个虚拟主机地址,而一个虚拟主机下可以部署多个 Web 应用程序;Engine 表示引擎,用来管理多个虚拟站点,一个 Service 最多只能有一个 Engine。

请求定位 Servlet 的过程:Tomcat 会创建一个 Service 组件和一个 Engine 容器组件,在 Engine 容器下创建两个 Host 子容器,在每个 Host 容器下创建两个 Context 子容器。由于一个 Web 应用通常有多个 Servlet,Tomcat 还会在每个 Context 容器里创建多个 Wrapper 子容器。

20200404220250

一个请求在 Tomcat 中流转的过程:

20200404220834

startup.sh 启动 tomcat 的过程:

20200404220908

Coyote

CyoOt是Tomcat的一个连接器组件,支持ajp、http1.1、spdy三种协议。

1、http

默认情况下,HTTP连接器使用Tomcat进行安装,并准备使用。该连接器具有最低的等待时间和最佳的整体性能。
对于集群,必须安装支持Web会话粘性的HTTP负载平衡器,以将流量引导到Tomcat服务器。Tomcat支持Adache HTTP Server 2 .x上的MODJOXER代理,并在默认情况下包含在Apache HTTP服务器2.2中作为负载均衡器。应该注意的是,HTTP代理的性能通常低于AJP的性能,所以AJP聚类通常是优选的。

2、ajp

当使用单个服务器时,在Tomcat实例前面使用 native webserver时的性能大部分时间都比具有默认HTTP连接器的独立Tomcat更差,即使Web应用程序的很大一部分是由静态文件构成的。如果出于任何原因需要与本机WebServer集成,AJP连接器将提供比代理HTTP更快的性能。从Tomcat的角度来看,AJP聚类是最有效的。它在功能上等同于HTTP集群。

Jasper

Tomcat的JSP引擎。解析JSP文件编译成java代码的servlet(可以处理卡特琳娜)。在运行时,Jasper检测到对JSP文件的更改并重新编译它们。

Pipeline和Value

对于应用服务器来说,增强各组件的扩展性以及灵活性是非常重要的,Tomcat采用职责链模式来实现每个Container组件处理请求的功能。

Pipeline代表职责链,后者表示阀门,具体的处理过程,每条线路上包含哪些操作,操作按照顺序一个个执行。Tomcat通过这种方式来决定每个容器的执行过程。

每一个容器都有一个 Pipeline 对象。

20200404220801

Tomcat监听客户端的请求,获得请求后交个各个组件去处理,返回响应数据到客户端,并且Tomcat能支持HTTP、AJP等协议。并且在扩展性、可用性上有着非常优秀的设计。

启动流程

tomcat的启动流程很标准化,入口是BootStrap,统一按照生命周期管理接口Lifecycle的定义进行启动。首先,调用init()方法逐级初始化,接着调用start()方法进行启动,同时,每次调用伴随着生命周期状态变更事件的触发。

20200405225231

请求流程

如果请求为:http://localhost:8080/index.jsp

  1. 请求被发送到本机端口8080,被在那里侦听的Coyote HTTP/1.1 Connector连接器组件获得
  2. Connector把该请求交给它所在的Service的Engine来处理,并等待Engine的回应
  3. Engine获得请求localhost:8080/index.jsp,匹配它所有虚拟主机Host
  4. Engine匹配到名为localhost的Host(即使匹配不到也把请求交给该Host处理,因为该Host被定义为该Engine的默认主机)
  5. localhost Host获得请求/index.jsp,匹配它所拥有的所有Context
  6. Host匹配到路径为/的Context,path=”/“的Context获得请求/index.jsp,在它的mapping table中寻找对应的servlet
  7. Context匹配到URL PATTERN为*.jsp的servlet,对应于JspServlet类
  8. 构造HttpServletRequest对象和HttpServletResponse对象,作为参数调用JspServlet的doGet或doPost方法
  9. Context把执行完了之后的HttpServletResponse对象返回给Host
  10. Host把HttpServletResponse对象返回给Engine
  11. Engine把HttpServletResponse对象返回给Connector
  12. Connector把HttpServletResponse对象返回给客户browser

    类加载机制

    tomcat自定义了三种类加载器。commonLoader、catalinaLoader、sharedLoader。
  • commonLoader:Tomcat最基本的类加载器,加载路径中的class可以被Tomcat容器本身以及各个Webapp访问;
  • catalinaLoader:Tomcat容器私有的类加载器,加载路径中的class对于Webapp不可见;
  • sharedLoader:各个Webapp共享的类加载器,加载路径中的class对于所有Webapp可见,但是对于Tomcat容器不可见;
  • WebappClassLoader:各个Webapp私有的类加载器,加载路径中的class只对当前Webapp可见;

jvm加载器加载机制是双亲委派模型,tomcat 为了实现隔离性,没有遵守这个约定,每个webappClassLoader加载自己的目录下的class文件,不会传递给父类加载器。

Tomcat8 和 Tomcat6比较大的区别是 :Tomcat8可以通过配置 不打破双亲委托 ,类的加载顺序略不同
20200405170917

总结

1
2
3
4
5
6
7
8
9
Server:代表整个Servlet容器
Service:代表Connector集合、Container容器。多个Connector对应一个Container
Container:通用的容器。
Connector:链接器
Engine:Servlet引擎
Host:主机映射
Context:webAPP
Wrapper:代表Servlet
Executor:线程池

以上的配置在Server.xml文件中

spring-boot实现websocket

发表于 2018-08-31 | 分类于 websocket

服务端

服务端的实现,通常有两种方式:

一种用@ServerEndPoint注解来实现;第二种通过请求参数传递采方式,服务端的参数在拦截器中获取之后通过attributes传递给WebSocketHandler,可以添加拦截器在WebSocket连接建立和断开前进行一些其他操作。

其实第一种也就是前面说过的java封装的一种实现。因为在spring-boot中,需要把类注册到容器,还需要配置:

1
2
3
4
5
6
7
8
9
//自动注册所有@ServerEndpoint注解声明的WebsocketEndpoint
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}

}

@ServerEndpoint实现的时候,可能会出现注入失败的问题,可以通过手动注入的方式来解决。

1
2
ConfigurableApplicationContext applicationContext = new SpringApplication(Application.class).run(args);
WebSocketController.setApplicationContext(applicationContext); // 再在WebSocketController中获取Bean

第二种实现就是和spring差不多,实现WebSocketHandler,或者更有可能扩展TextWebSocketHandler或BinaryWebSocketHandler。

示例:

pom:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<artifactId>SpringBoot-websocket</artifactId>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-parent</artifactId>
<version>2.1.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.3.5</version>
</dependency>
</dependencies>
</project>

java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//配置类
@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig implements WebMvcConfigurer,WebSocketConfigurer {

@Autowired
private WebSocketController webSocketController;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketController, "/webSocket")
.addInterceptors(new HandShakeInterceptor()).setAllowedOrigins("*");
}

}
//拦截器
public class HandShakeInterceptor extends HttpSessionHandshakeInterceptor {
private final Logger logger = LoggerFactory.getLogger(HandShakeInterceptor.class);

/*
* 在WebSocket连接建立之前的操作,以鉴权为例
*/
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//一些鉴权操作
return true;
}

public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception ex) {

}

}
//控制器
@RestController
public class WebSocketController implements WebSocketHandler {

private static AtomicInteger onlineCount = new AtomicInteger(0);

private static final ArrayList<WebSocketSession> sessions = new ArrayList<>();

private final Logger LOGGER = LoggerFactory.getLogger(WebSocketController.class);

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
int onlineNum = onlineCount.incrementAndGet();
LOGGER.info("打开连接,目前连接数: " + onlineNum);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
int onlineNum = onlineCount.decrementAndGet();
LOGGER.info("连接关闭: " + onlineNum);
}

@Override
public void handleMessage(WebSocketSession wsSession, WebSocketMessage<?> message) throws Exception {
LOGGER.info("收到的消息: " + message.toString());
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
LOGGER.error("连接错误");
if (session.isOpen()) {
session.close();
}
sessions.remove(session);
onlineCount.decrementAndGet();
}

/*
* 是否支持消息拆分发送:如果接收的数据量比较大,最好打开(true), 否则可能会导致接收失败。
* 如果出现WebSocket连接接收一次数据后就自动断开,可能这里由问题。
*/
@Override
public boolean supportsPartialMessages() {
return true;
}
}

//
@SpringBootApplication
@ComponentScan("com.hu.demo")
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}

启动

客户端

客户端的实现,通常有html和java WebSocketClient两种方式,html前面已经有,这里用client实现一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class WebSocketClientDemo extends WebSocketClient {

public WebSocketClientDemo(String url) throws URISyntaxException {
super(new URI(url));
}
public WebSocketClientDemo(URI serverUri, Draft protocolDraft) {
super(serverUri, protocolDraft);
}

@Override
public void onOpen(ServerHandshake shake) {
System.out.println("握手...");
for (Iterator<String> it = shake.iterateHttpFields(); it.hasNext(); ) {
String key = it.next();
System.out.println(key + ":" + shake.getFieldValue(key));
}
}

@Override
public void onMessage(String paramString) {
System.out.println("接收到消息:" + paramString);
}

@Override
public void onClose(int paramInt, String paramString, boolean paramBoolean) {
System.out.println("关闭...");
}

@Override
public void onError(Exception e) {
System.out.println("异常" + e);

}

}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ClientDemo {
public static void main(String[] args) throws Exception{
String serverUrl = "ws://127.0.0.1:8080/webSocket";
URI recognizeUri = new URI(serverUrl);
WebSocketClientDemo client = new WebSocketClientDemo(recognizeUri, new Draft_6455());
client.connect();
while (!client.getReadyState().equals(WebSocket.READYSTATE.OPEN)) {
System.out.println("未打开");
}
System.out.println("建立websocket连接");
client.send("hello");
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
未打开
未打开
未打开
未打开
未打开
未打开
握手...
已经连接
Connection:upgrade
Date:Sat, 04 Apr 2020 09:26:08 GMT
Sec-WebSocket-Accept:SIBN8X8GwCVQmjNH1VNNZnr+WY4=
Upgrade:websocket

WebSocket的spring实现

发表于 2018-08-31 | 分类于 websocket

Spring Framework 提供了一个 WebSocket API,您可以使用它来编写处理 WebSocket 消息的 client-和 server-side applications。

WebSocket API

1. WebSocketHandler

创建 WebSocket 服务器就像实现WebSocketHandler一样简单,或者更有可能扩展TextWebSocketHandler或BinaryWebSocketHandler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MyHandler implements WebSocketHandler {
private static final Logger logger = Logger.getLogger(MyHandler.class);
private static final AtomicInteger connectionIds = new AtomicInteger(0);
private Map<String, WebSocketSession> connections = new ConcurrentHashMap<>();
private String userId;

// 建立连接时候触发
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.debug("连接成功!");
this.userId = "user" + connectionIds.getAndIncrement();
session.getAttributes().put(userId, userId);
connections.putIfAbsent(userId, session);
}

// 处理消息
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
throws Exception {
// 防止中文乱码
String msg = URLDecoder.decode(webSocketMessage.getPayload().toString(), "utf-8");
// 简单模拟群发消息
TextMessage reply = new TextMessage(userId + " : " + msg);
connections.forEach((s, session) -> {
try {
session.sendMessage(reply);
} catch (IOException e) {
e.printStackTrace();
}
});

}

public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
if (webSocketSession.isOpen()) {
webSocketSession.close();
}
logger.debug("连接出错...关闭!");

}

// 关闭连接时候触发
public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
connections.remove(userId);
logger.debug("连接关闭!" + closeStatus.toString());
}

@Override
public boolean supportsPartialMessages() {
return false;
}
}

使用xml或者Java配置方式配置Bean。

2.WebSocket 握手

自定义初始 HTTP WebSocket 握手请求的最简单方法是通过HandshakeInterceptor,它在握手之前“之前”和“之后”暴露方法。您可以使用此类拦截器来阻止握手或使WebSocketSession可以使用任何属性。以下 example 使用 built-in 拦截器将 HTTP session 属性传递给 WebSocket session:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 配置WebSocket 也可以使用xml配置
*
* Spring 的 WebSocket 支持不依赖于 Spring MVC。在WebSocketHttpRequestHandler的帮助下将WebSocketHandler集成到其他 HTTP 服务环境中相对简单。
*/
@Configuration
@EnableWebSocket // 开启websocket
public class WebSocketConfig implements WebSocketConfigurer {

/**
* 自定义初始 HTTP WebSocket 握手请求的最简单方法是通过HandshakeInterceptor,它将握手方法“之前”和“之后”。
* 这样的拦截器可用于排除握手或使WebSocketSession可用的任何属性。用于将 HTTP session 属性传递给 WebSocket session:
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler");
}

@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
//HttpSessionHandshakeInterceptor拦截器,也可以选择使用。

3. 部署

web.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<display-name>websocket</display-name>

<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>

<servlet>
<servlet-name>springmvc</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:springMVCConfig.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>springmvc</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>

<!--处理乱码-->
<filter>
<filter-name>characterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>characterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
</web-app>

4. 服务器 Configuration

每个底层 WebSocket 引擎都会公开控制运行时特征的 configuration properties,例如消息缓冲区大小,idle 超时等等。

对于 Tomcat,WildFly 和 GlassFish,您可以将ServletServerContainerFactoryBean添加到 WebSocket Java 配置中

1
2
3
4
5
6
7
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}

5.跨域问题

从 Spring Framework 4.1.5 开始,WebSocket 和 SockJS 的默认行为是仅接受 same-origin 请求。也可以允许所有或指定的起源列表。

三种情况:

  • 仅允许 same-origin 个请求(默认):在此模式下,启用 SockJS 时,Iframe HTTP 响应头X-Frame-Options设置为SAMEORIGIN,并且禁用 JSONP 传输,因为它不允许检查请求的来源。因此,启用此模式时不支持 IE6 和 IE7。
  • 允许指定的原始列表:每个允许的原点必须以http://或https://开头。在此模式下,启用 SockJS 时,将禁用 IFrame 传输。因此,启用此模式时,不支持 IE6 到 IE9。
  • 允许所有来源:要启用此模式,您应提供*作为允许的原始值。在此模式下,所有传输都可用。

配置 WebSocket 和 SockJS 允许的源:

1
2
3
4
 @Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler").setAllowedOrigins("http://mydomain.com");
}

SockJS

有一些浏览器中缺少对WebSocket的支持,而SockJS是一个浏览器的JavaScript库,它提供了一个类似于网络的对象,SockJS提供了一个连贯的,跨浏览器的JavaScriptAPI,它在浏览器和Web服务器之间创建了一个低延迟、全双工、跨域通信通道。SockJS的一大好处在于提供了浏览器兼容性。即优先使用原生WebSocket,如果浏览器不支持WebSocket,会自动降为轮询的方式。

启用 SockJS

1
2
3
4
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler").withSockJS();
}

WebSocket原理和实现

发表于 2018-08-31 | 分类于 websocket

简单介绍

WebSocket 协议在2008年诞生,2011年成为国际标准。现在所有浏览器都已经支持了。WebSocket 的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,双工模式。

WebSocket是HTML5出(协议),也就是说HTTP协议没有变化,或者说没关系,但HTTP是不支持持久连接的(长连接,循环连接的不算)HTTP 有 1.1 和 1.0 之说,也就是所谓的 keep-alive ,把多个 HTTP 请求合并为一个,但是 Websocket 其实是一个新协议,跟 HTTP 协议基本没有关系,只是为了兼容现有浏览器,所以在握手阶段使用了 HTTP 。

20200328182707

Websocket是一个持久化的协议,相对于HTTP这种非持久的协议来说。

WebSocket 建立在 TCP 协议之上,服务器端的实现比较容易。与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。

数据格式比较轻量,性能开销小,通信高效。可以发送文本,也可以发送二进制数据。没有同源限制,客户端可以与任意服务器通信。协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

原理

首先 WebSocket 是基于 HTTP 协议的,或者说借用了 HTTP 协议来完成一部分握手。

1
2
3
4
5
6
7
8
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Origin: http://example.com

这段类似 HTTP 协议的握手请求中,多了这么几个东西。

1
2
Upgrade: websocket
Connection: Upgrade

这就是 WebSocket 的核心了,告诉 Apache 、 Nginx 等服务器,发起的请求要用 WebSocket 协议,找到对应处理而不是 HTTP。

1
2
3
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

首先, Sec-WebSocket-Key 是一个 Base64 encode 的值,这个是浏览器随机生成的,告诉服务器来验证是否是WebSocket服务。

然后, Sec_WebSocket-Protocol 是一个用户定义的字符串,用来区分同 URL 下,不同的服务所需要的协议。

最后, Sec-WebSocket-Version 是告诉服务器所使用的 WebSocket Draft (协议版本),在最初的时候,WebSocket 协议还在 Draft 阶段,各种不同的协议都有,Firefox 和 Chrome 用的不是一个版本之类的,当初 WebSocket 协议太多可是一个大难题。

然后服务器会返回下列东西,表示已经接受到请求, 成功建立 WebSocket:

1
2
3
4
5
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
Sec-WebSocket-Protocol: chat

这里开始就是 HTTP 最后负责的部分。告诉了客户端,成功转换协议WebSocket协议。

1
2
Upgrade: websocket
Connection: Upgrade

告诉客户端即将升级的是 WebSocket 协议。 Sec-WebSocket-Accept 这个则是经过服务器确认,并且加密过后的 Sec-WebSocket-Key , Sec-WebSocket-Protocol 则是表示最终使用的协议。

实现

在没有WebSocket之前,我们是怎么处理长连接的情况,是使用ajax轮询 和 long poll。大多数 Web 应用程序将通过频繁的异步 JavaScript 和 XML(AJAX)请求实现长轮询。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。

  • ajax轮询
    ajax轮询的原理非常简单,让浏览器隔个几秒就发送一次请求,询问服务器是否有新信息。
  • poll
    long poll 其实原理跟 ajax轮询 差不多,都是采用轮询的方式,不过采取的是阻塞模型,客户端发起请求后,如果没消息,就一直不返回 Response 给客户端。直到有消息才返回,返回完之后,客户端再次建立连接,周而复始。

这两种方式,都是在不断地建立HTTP连接,然后等待服务端处理,可以体现HTTP协议的另外一个特点,被动性:服务端不能主动联系客户端,只能有客户端发起。

ajax轮询 需要服务器有很快的处理速度和资源。long poll 需要有很高的并发。

  • WebSocket 如何工作?

Web 浏览器和服务器都必须实现 WebSockets 协议来建立和维护连接。由于 WebSockets 连接长期存在,与典型的 HTTP 连接不同,对服务器有重要的影响。

基于多线程或多进程的服务器无法适用于 WebSockets,因为它旨在打开连接,尽可能快地处理请求,然后关闭连接。任何实际的 WebSockets 服务器端实现都需要一个异步服务器。

WebSocket 客户端

在客户端,没有必要为 WebSockets 使用 JavaScript 库。实现 WebSockets 的 Web 浏览器将通过 WebSockets 对象实现客户端功能(Html5已经实现了WebSocket)。

API

以下 API 用于创建 WebSocket 对象。

1
2
//第一个参数 url, 指定连接的 URL。第二个参数 protocol 是可选的,指定了可接受的子协议。
var Socket = new WebSocket(url, [protocol] );

WebSocket 属性
属性 | 描述
——- | ——-
Socket.readyState | 只读属性 readyState 表示连接状态,CONNECTING:值为0,表示正在连接。OPEN:值为1,表示连接成功,可以通信了。CLOSING:值为2,表示连接正在关闭。CLOSED:值为3,表示连接已经关闭,或者打开连接失败。
Socket.bufferedAmount | 只读属性 bufferedAmount 已被 send() 放入正在队列中等待传输,但是还没有发出的 UTF-8 文本字节数。
WebSocket 事件
事件 | 事件处理程序 | 描述
——- | ——- | ——-
open | Socket.onopen | 连接建立时触发
message | Socket.onmessage | 客户端接收服务端数据时触发
error | Socket.onerror | 通信发生错误时触发
close | Socket.onclose | 连接关闭时触发
WebSocket 方法
方法 | 描述
—|—
Socket.send() | 使用连接发送数据
Socket.close() | 关闭连接

WebSocket 服务端

WebSocket 在服务端的实现非常丰富。Node.js、Java、C++、Python 等多种语言都有自己的解决方案。
语言框架 | 实现
——- | ——-
Node.js | 3种实现:µWebSockets、Socket.IO、WebSocket-Node
Java | javaweb通常依托servlet 容器:Tomcat7、Jetty7 及以上版本均开始支持 WebSocket。Spring 框架对 WebSocket 也提供了支持。

它们都遵循RFC6455 的通信标准,并且 Java API 统一遵循 JSR 356 - JavaTM API for WebSocket 规范。所以,在实际编码中,API 差异不大。

示例

新建一个maven项目引入包:

1
2
3
4
5
6
7
8
9
<!-- javaweb实现的websocket -->
<dependencies>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
<scope>provided</scope>
</dependency>
</dependencies>

页面文件js:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
websocket = new WebSocket("ws://localhost:8080/demo/demo1");
} else {
alert('Not support websocket')
}

websocket.onerror = function () {
setMessageInnerHTML("WebSocket连接发生错误");
};

websocket.onopen = function () {
setMessageInnerHTML("WebSocket连接成功");
}

//接收到消息的回调方法
websocket.onmessage = function (event) {
setMessageInnerHTML(event.data);
}

websocket.onclose = function () {
setMessageInnerHTML("WebSocket连接关闭");
}

//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接。
window.onbeforeunload = function () {
closeWebSocket();
}

function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}

function closeWebSocket() {
websocket.close();
}

function send() {
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>

Java web服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* @ServerEndpoint 注解是将目前的类定义成一个websocket服务器端
*/
@ServerEndpoint("/demo1")
public class WebSocketServiceDemo {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;

/**
* 连接建立成功调用的方法
*
* @param session 可选的参数。
* session为与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
System.out.println("有新连接加入!");
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
System.out.println("有一连接关闭!");
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
* @param session 可选的参数
*/
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("来自客户端的消息:" + message);
//回复消息
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 发生错误时调用
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("发生错误");
error.printStackTrace();
}

}

idea中部测试:

创建webapp目录和web.xml
20200328185021
创建artifacts
20200328185303
配置服务器:

选择Deployment

运行:

1
2
3
4
有新连接加入!
来自客户端的消息:你好
有新连接加入!
来自客户端的消息:a你好

把它改成一个聊天室的模式
用户可以看到加入的其他用户发的消息,后台代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@ServerEndpoint(value = "/demo1", encoders = {ServerEncoder.class})
public class WebSocketServiceDemo {
private static final AtomicInteger connectionIds = new AtomicInteger(0);
// 存放用户的websocket
private static final Set<WebSocketServiceDemo> connections = new CopyOnWriteArraySet<>();

private final String nickname;
private Session session;

public WebSocketServiceDemo() {
nickname = "用户" + connectionIds.getAndIncrement();
}

/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
connections.add(this);
String message = String.format(" %s %s", nickname, "加入聊天室");
// 上线通知
broadcast(0,message);
try {
// 系统问候语
sendHello(this);
// 返回在线用户
onlineList();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
connections.remove(this);
connectionIds.decrementAndGet();
try {
onlineList();
} catch (Exception e) {
e.printStackTrace();
}
String message = String.format(" %s %s", nickname, "退出聊天室");
broadcast(0,message);
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
* @param session 可选的参数
*/
@OnMessage
public void onMessage(String message, Session session) {
// TODO: 过滤输入的内容
String m = String.format(" %s %s", nickname, message);
if (m.contains("To")) {
// 点对点发送 消息中包含 To
broadcastOneToOne(3,m);
} else {
// 群发
broadcast(1,m);
}
}

/**
* 发生错误时调用
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("发生错误");
error.printStackTrace();
}

/**
* 消息广播 通过connections,对所有其他用户推送信息的方法
*
* @param msg
*/
private void broadcast(int type, String msg) {
Message message = new Message(type, msg);
for (WebSocketServiceDemo client : connections) {
try {
synchronized (client) {
client.session.getBasicRemote().sendObject(message);
}
} catch (Exception e) {
System.out.println("错误:向客户端发送消息失败");
connections.remove(client);
try {
client.session.close();
} catch (Exception e1) {
e1.printStackTrace();
}
String msg1 = String.format(" %s %s", client.nickname, "退出聊天室");
broadcast(0,msg1);
}
}
}

/**
* 点对点发送消息
*/
private void broadcastOneToOne(int type, String msg) {
String[] arr = msg.split("To");
String toNickName=arr[1];
Message message = new Message(type, arr[0]);
boolean isSend = false;
for (WebSocketServiceDemo client : connections) {
try {
if (toNickName.equals(client.nickname)) {
synchronized (client) {
try {
client.session.getBasicRemote().sendObject(message);
this.session.getBasicRemote().sendObject(message);
isSend = true;
} catch (EncodeException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
System.out.println("错误:向用户发送消息失败");
connections.remove(client);
try {
client.session.close();
} catch (IOException e1) {
e1.printStackTrace();
}
broadcast(0, String.format(" %s %s", client.nickname, "退出聊天室"));
}
}
if(!isSend){
try {
this.session.getBasicRemote().sendObject(new Message(0, "该用户不存在"));
} catch (IOException e) {
e.printStackTrace();
} catch (EncodeException e) {
e.printStackTrace();
}
}
}

// 系统问候语
private void sendHello(WebSocketServiceDemo client) throws Exception {
String m = String.format(" %s 你好!!", client.nickname);
Message message = new Message(0, m);
client.session.getBasicRemote().sendObject(message);
}

// 在线用户,所有的用户一起推送。
private void onlineList() throws Exception {
String online = "";
for (WebSocketServiceDemo client : connections) {
if (online.equals("")) {
online = client.nickname;
} else {
online += "</br>" + client.nickname;
}
}
Message message = new Message(2, online);
for (WebSocketServiceDemo client : connections) {
client.session.getBasicRemote().sendObject(message);
}
}

public static class Message implements Serializable {
private int type;//消息类型
private String msg;//消息内容

public Message(int type, String msg) {
this.type = type;
this.msg = msg;
}

public int getType() {
return type;
}

public void setType(int type) {
this.type = type;
}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}
}

由于在运行时报错:javax.websocket.EncodeException: No encoder specified for object of class
因为,使用client.session.getBasicRemote().sendObject(obj);发送对象数据。解决办法:

新增一个类,并且在主类注解中加入代码:encoders = {ServerEncoder.class}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ServerEncoder implements Encoder.Text<WebSocketServiceDemo.Message> {

@Override
public void destroy() {
}

@Override
public void init(EndpointConfig arg0) {
}

@Override
public String encode(WebSocketServiceDemo.Message messagepojo) throws EncodeException {
return JSONObject.toJSONString(messagepojo);
}
}

页面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
<html>
<head>
<title>WebSocket聊天室的实现</title>
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
websocket = new WebSocket("ws://localhost:8080/demo/demo1");
} else {
alert('Not support websocket')
}

websocket.onerror = function () {
document.getElementById('tps').innerHTML = '连接错误!';
};

websocket.onopen = function () {
document.getElementById('tps').innerHTML = '连接成功!';
}

//接收到消息的回调方法
websocket.onmessage = function (event) {
console.log(event.data);
let obj = JSON.parse(event.data);
if(obj.type == 0){//系统消息
console.log("syso");
setMessageInnerHTML("<span style='color: crimson;font-size: 12px'>系统:"+obj.msg+"</span>");
}else if(obj.type == 1){
setMessageInnerHTML("<span style='color: black;font-size: 14px'>>:"+obj.msg+"</span>");
}else if(obj.type == 2){//成员列表
document.getElementById('users').innerHTML = obj.msg + '<br/>';
}else if(obj.type == 3){//私发
setMessageInnerHTML("<span style='color: black;font-size: 14px'>>:"+obj.msg+"</span>");
}
}

websocket.onclose = function () {
document.getElementById('tps').innerHTML = '连接关闭!';
}

//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接。
window.onbeforeunload = function () {
closeWebSocket();
}

function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}

function closeWebSocket() {
websocket.close();
}

function send() {
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</head>
<body>
<input id="text" type="text"/>
<button onclick="send()">发送消息</button> <button onclick="closeWebSocket()">关闭连接</button>
<span id="tps" style="color: crimson"></span>
<hr/>
<div id="message" style="width: 80%; float:left; display: block; border: 1px solid #1353C2;"></div>
<div style="width: 18%; float: right;display: block; border: 1px solid #1353C2;">
<div >成员:</div>
<div id="users" style="font-size: 13px"></div>
</div>
</body>

测试成功:

多用户多聊天室

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@ServerEndpoint(value = "/demo/{roomId}/{userId}", encoders = {ServerEncoder.class})
public class WebSocketServiceDemo1 {
private static final Map<String, Set<WebSocketServiceDemo1>> rooms = new ConcurrentHashMap<>();
private String roomId;
private String userId;
private Session session;

public WebSocketServiceDemo1() {

}

/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
this.roomId = roomId;
if(rooms.get(roomId)==null){
Set<WebSocketServiceDemo1> socketServices = new CopyOnWriteArraySet<>();
socketServices.add(this);
rooms.put(roomId,socketServices);
}else {
rooms.get(roomId).add(this);
}
String message = String.format(" %s %s", userId, "加入聊天室");
// 上线通知
broadcast(0,message);
try {
// 系统问候语
sendHello(this);
// 返回在线用户
onlineList();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
System.out.println(this.userId+"退出");
rooms.get(roomId).remove(this);
try {
onlineList();
} catch (Exception e) {
e.printStackTrace();
}
String message = String.format(" %s %s", userId, "退出聊天室");
broadcast(0,message);
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
* @param session 可选的参数
*/
@OnMessage
public void onMessage(String message, Session session) {
// TODO: 过滤输入的内容
String m = String.format(" %s %s", userId, message);
if (m.contains("To")) {
// 点对点发送 消息中包含 To
broadcastOneToOne(3,m);
} else {
// 群发
broadcast(1,m);
}
}

/**
* 发生错误时调用
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("发生错误");
error.printStackTrace();
}

/**
* 消息广播 通过connections,对所有其他用户推送信息的方法
*
* @param msg
*/
private void broadcast(int type, String msg) {
Message message = new Message(type, msg);
Set<WebSocketServiceDemo1> connections = rooms.get(roomId);
for (WebSocketServiceDemo1 client : connections) {
try {
synchronized (client) {
client.session.getBasicRemote().sendObject(message);
}
} catch (Exception e) {
System.out.println("错误:向客户端发送消息失败");
connections.remove(client);
try {
client.session.close();
} catch (Exception e1) {
e1.printStackTrace();
}
String msg1 = String.format(" %s %s", client.userId, "退出聊天室");
broadcast(0,msg1);
}
}
}

/**
* 点对点发送消息
*/
private void broadcastOneToOne(int type, String msg) {
String[] arr = msg.split("To");
String toNickName=arr[1];
Message message = new Message(type, arr[0]);
boolean isSend = false;
Set<WebSocketServiceDemo1> connections = rooms.get(roomId);
for (WebSocketServiceDemo1 client : connections) {
try {
if (toNickName.equals(client.userId)) {
synchronized (client) {
try {
client.session.getBasicRemote().sendObject(message);
this.session.getBasicRemote().sendObject(message);
isSend = true;
} catch (EncodeException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
System.out.println("错误:向用户发送消息失败");
connections.remove(client);
try {
client.session.close();
} catch (IOException e1) {
e1.printStackTrace();
}
broadcast(0, String.format(" %s %s", client.userId, "退出聊天室"));
}
}
if(!isSend){
try {
this.session.getBasicRemote().sendObject(new Message(0, "该用户不存在"));
} catch (IOException e) {
e.printStackTrace();
} catch (EncodeException e) {
e.printStackTrace();
}
}
}

// 系统问候语
private void sendHello(WebSocketServiceDemo1 client) throws Exception {
String m = String.format(" %s 你好!!", client.userId);
Message message = new Message(0, m);
client.session.getBasicRemote().sendObject(message);
}

// 在线用户,所有的用户一起推送。
private void onlineList() throws Exception {
String online = "";
Set<WebSocketServiceDemo1> connections = rooms.get(roomId);
for (WebSocketServiceDemo1 client : connections) {
if (online.equals("")) {
online = client.userId;
} else {
online += "</br>" + client.userId;
}
}
Message message = new Message(2, online);
for (WebSocketServiceDemo1 client : connections) {
client.session.getBasicRemote().sendObject(message);
}
}

public static class Message implements Serializable {
private int type;//消息类型
private String msg;//消息内容

public Message(int type, String msg) {
this.type = type;
this.msg = msg;
}

public int getType() {
return type;
}

public void setType(int type) {
this.type = type;
}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}
}

index:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
<%@ page language="java" pageEncoding="UTF-8" %>
<!DOCTYPE html>
<html>
<head>
<title>WebSocket聊天室的实现</title>
<script type="text/javascript">
function soc() {
let roomId = document.getElementById("roomId").value;
let userId = document.getElementById("userId").value;
if(roomId=='' || userId==''){
document.getElementById('tps').innerHTML = '房间号和用户id不能为空!';
return;
}
socket(roomId,userId);
}
var websocket = null;
function socket(roomId,userId) {
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
websocket = new WebSocket("ws://localhost:8080/demo/demo/"+roomId+"/"+userId);
} else {
alert('Not support websocket')
}

websocket.onerror = function () {
document.getElementById('tps').innerHTML = '连接错误!';
};

websocket.onopen = function () {
document.getElementById('tps').innerHTML = '连接成功!';
}

//接收到消息的回调方法
websocket.onmessage = function (event) {
console.log(event.data);
let obj = JSON.parse(event.data);
if(obj.type == 0){//系统消息
console.log("syso");
setMessageInnerHTML("<span style='color: crimson;font-size: 12px'>系统:"+obj.msg+"</span>");
}else if(obj.type == 1){
setMessageInnerHTML("<span style='color: black;font-size: 14px'>>:"+obj.msg+"</span>");
}else if(obj.type == 2){//成员列表
document.getElementById('users').innerHTML = obj.msg + '<br/>';
}else if(obj.type == 3){//私发
setMessageInnerHTML("<span style='color: black;font-size: 14px'>>:"+obj.msg+"</span>");
}
}

websocket.onclose = function () {
document.getElementById('tps').innerHTML = '连接关闭!';
}

}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接。
window.onbeforeunload = function () {
closeWebSocket();
}

function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}

function closeWebSocket() {
websocket.close();
}

function send() {
var message = document.getElementById('text').value;
websocket.send(message);
}


</script>
</head>
<body>
聊天室ID:<input id="roomId" type="text"/>
用户ID:<input id="userId" type="text"/>
<button onclick="soc()">连接聊天室</button>
<hr/>
输入:<input id="text" type="text"/> <button onclick="send()">发送消息</button> <button onclick="closeWebSocket()">关闭连接</button>
<span id="tps" style="color: crimson"></span>
<hr/>
<div id="message" style="width: 80%; float:left; display: block; border: 1px solid #1353C2;"></div>
<div style="width: 18%; float: right;display: block; border: 1px solid #1353C2;">
<div >成员:</div>
<div id="users" style="font-size: 13px"></div>
</div>
</body>
</html>

测试完成。

简单的websocket就完成。后面介绍websocket spring或springboot实现。

Mysql跨N库分页

发表于 2018-08-23 | 分类于 mysql

方法一:全局法

(1)将order by time offset X limit Y,改写成order by time offset 0 limit X+Y

(2)服务层对得到的N*(X+Y)条数据进行内存排序,内存排序后再取偏移量X后的Y条记录

这种方法随着翻页的进行,性能越来越低。

方法二:业务折衷法-禁止跳页查询

(1)用正常的方法取得第一页数据,并得到第一页记录的time_max

(2)每次翻页,将order by time offset X limit Y,改写成order by time where time>$time_max limit Y

以保证每次只返回一页数据,性能为常量。

方法三:业务折衷法-允许模糊数据

(1)将order by time offset X limit Y,改写成order by time offset X/N limit Y/N

方法四:二次查询法

(1)将order by time offset X limit Y,改写成order by time offset X/N limit Y

(2)找到最小值time_min

(3)between二次查询,order by time between $time_min and $time_i_max

(4)设置虚拟time_min,找到time_min在各个分库的offset,从而得到time_min在全局的offset

(5)得到了time_min在全局的offset,自然得到了全局的offset X limit Y

Git分支切换

发表于 2018-08-21 | 分类于 工具

本地库上传到远程库

20200421153225

创建本地库

$ git init
Initialized empty Git repository in H:/Admin桌面}/临时}Git/config-repo-demo/.git/

新增文件后查看状态

hu@hu MINGW64 /h/Admin桌面}/临时}Git/config-repo-demo (master)
$ git status
On branch master

No commits yet

Untracked files:
(use "git add <file>..." to include in what will be committed)

    config-client-dev.yml
    config-client.yml

nothing added to commit but untracked files present (use "git add" to track)

将当前目录所有文件提交本地git仓库

hu@hu MINGW64 /h/Admin桌面}/临时}Git/config-repo-demo (master)
$ git add .

提交版本信息到本地库

hu@hu MINGW64 /h/Admin桌面}/临时}Git/config-repo-demo (master)
$ git commit -m "添加配置信息}"
[master (root-commit) b19476a] 添加配置信息
2 files changed, 4 insertions(+)
create mode 100644 config-client-dev.yml
create mode 100644 config-client.yml

将本地的仓库关联到GitHub

hu@hu MINGW64 /h/Admin桌面}/临时}Git/config-repo-demo (master)
$ git remote add origin https://github.com/huingsn/config-repo-demo.git

将远程库下载到本地

hu@hu MINGW64 /h/Admin桌面}/临时}Git/config-repo-demo (master)
$ git clone https://github.com/huingsn/config-repo-demo.git
Cloning into 'config-repo-demo'...
remote: Enumerating objects: 3, done.
remote: Counting objects: 100% (3/3), done.
remote: Total 3 (delta 0), reused 0 (delta 0), pack-reused 0
Unpacking objects: 100% (3/3), done.

上传github之前pull一下

hu@hu MINGW64 /h/Admin桌面}/临时}Git/config-repo-demo (master)
$ git pull --rebase origin master
warning: no common commits
remote: Enumerating objects: 3, done.
remote: Counting objects: 100% (3/3), done.
remote: Total 3 (delta 0), reused 0 (delta 0), pack-reused 0
Unpacking objects: 100% (3/3), done.
From https://github.com/huingsn/config-repo-demo
* branch            master     -> FETCH_HEAD
* [new branch]      master     -> origin/master
First, rewinding head to replay your work on top of it...
Applying: 添加配置信息

上传代码到GitHub远程仓库

hu@hu MINGW64 /h/Admin桌面}/临时}Git/config-repo-demo (master)
$ git push -u origin master
Enumerating objects: 4, done.
Counting objects: 100% (4/4), done.
Delta compression using up to 4 threads
Compressing objects: 100% (2/2), done.
Writing objects: 100% (3/3), 341 bytes | 341.00 KiB/s, done.
Total 3 (delta 0), reused 0 (delta 0)
To https://github.com/huingsn/config-repo-demo.git
b67a80d..4191bba  master -> master
Branch 'master' set up to track remote branch 'master' from 'origin'.

完毕!

上面是本地和远程都新建–关联–同步

如果是已经同步好的库,本地做了修改,需要同步,只需要这几个步骤:

1、查看当前的git仓库状态
        git status
2、更新全部
git add *
3、接着输入git commit -m "更新说明"
        git commit -m "更新说明"
4、先git pull,拉取当前分支最新代码
        git pull
5、push到远程master分支上
        git push origin master:master

master为我自己的分支的名称,实际应用中,你要改成自己的分支的名称

打开GitHub已经同步了

切换分支

1、查看远程分支

$ git branch -a
* master
remotes/origin/HEAD -> origin/master
remotes/origin/config-label-test
remotes/origin/master

可以看到,我们现在在master分支下

2、查看本地分支

$ git branch
* master

3、切换分支

$ git checkout -b config-label-test
Switched to a new branch 'config-label-test'
$ git branch
* config-label-test
master

4、切换回master分支

$ git checkout master
Switched to branch 'master'
Your branch is up to date with 'origin/master'.

git status 查看分支状态

回退版本

$ git reset --hard HEAD
git reset的作用是修改HEAD的位置,即将HEAD指向的位置改变为之前存在的某个版本

git revert的作用通过反做创建一个新的版本,这个版本的内容与我们要回退到的目标版本一样,但是HEAD指针是指向这个新生成的版本,而不是目标版本。

$ git fetch 相当于是从远程获取最新到本地,不会自动merge

git pull:相当于是从远程获取最新版本并merge到本地,在实际使用中,git fetch更安全一些

git reflog查看所有历史提交记录其中包括已用reset命令删除的提交

在vs中每次更新代码都会要输入账号密码,方便起见,可以配置一下让GIT记住密码账号。

git config –global credential.helper store //在Git Bash输入这个命令就可以了

git修改commit时间

git commit –amend –date=”Thu Aug 9 23:04:57 2018 -0700”

Spring Cloud Sleuth

发表于 2018-08-14 | 分类于 springcloud

随着分布式服务架构的流行,特别是微服务等设计理念在系统中的应用,业务的调用链越来越复杂。
系统规模也会变得越来越大,各微服务间的调用关系也变得越来越复杂。通常一个由客户端发起的请求在后端系统中会经过多个不同的微服务调用来协同产生最后的请求结果,在复杂的微服务架构系统中,几乎每一个前端请求都会形成一个复杂的分布式服务调用链路,在每条链路中任何一个依赖服务出现延迟过高或者错误都有可能引起请求最后的失败。同时,缺乏一个自上而下全局的调用id,如何有效的进行相关的数据分析工作?
分布式服务跟踪是整个分布式系统中跟踪一个用户请求的过程(包括数据采集、数据传输、数据存储、数据分析、数据可视化),捕获此类跟踪让我们构建用户交互背后的整个调用链的视图,这是调试和监控微服务的关键工具。Spring Cloud Sleuth是Spring Cloud为分布式服务跟踪提供的解决方案,有了它,我们可以:
提供链路追踪,故障快速定位:可以通过调用链结合业务日志快速定位错误信息。
可视化各个阶段耗时,进行性能分析
各个调用环节的可用性、梳理服务依赖关系以及优化
数据分析,优化链路:可以得到用户的行为路径,汇总分析应用在很多业务场景。

一个简单例子

准备注册中心,创建一个微服务应用:sleuth-trace-1,实现rest接口trace1,并且这个接口调用了sleuth-trace-2。
代码见sleuth-trace-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-sleuth</artifactId>
<groupId>com.hu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sleuth-trace-1</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-ribbon</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.6</version>
</dependency>
</dependencies>

</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@RestController
@EnableDiscoveryClient
@SpringBootApplication
public class TraceApplicationOne {
private final Logger logger = Logger.getLogger(getClass());

public static void main(String[] args) {
SpringApplication.run(TraceApplicationOne.class, args);
}

@Bean
@LoadBalanced//负载
RestTemplate restTemplate() {
return new RestTemplate();
}

@RequestMapping(value = "/trace1", method = RequestMethod.GET)
public String trace() {
logger.info("---------call sleuth-trace-1------------");
return restTemplate().getForEntity("http://sleuth-trace-2/trace2", String.class).getBody();
}

}

配置:

1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 7904
spring:
application:
name: sleuth-trace-1
eureka:
client:
serviceUrl:
defaultZone: http://user:123456@localhost:8761/eureka
logging:
level:
root: INFO

创建sleuth-trace-2
依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-sleuth</artifactId>
<groupId>com.hu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sleuth-trace-1</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.6</version>
</dependency>
</dependencies>

</project>

配置:

1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 7906
spring:
application:
name: sleuth-trace-2
eureka:
client:
serviceUrl:
defaultZone: http://user:123456@localhost:8761/eureka
logging:
level:
root: info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
@EnableDiscoveryClient
@SpringBootApplication
public class TraceApplicationTwo {
private final Logger logger = Logger.getLogger(getClass());

public static void main(String[] args) {
SpringApplication.run(TraceApplicationTwo.class, args);
}

@RequestMapping(value = "/trace2", method = RequestMethod.GET)
public String trace(HttpServletRequest request) {
logger.info("---------------call sleuth-trace-2 -------------------");
logger.info("----------------------"+ request.getHeader("X-B3-TraceId")+ request.getHeader("X-B3-SpanId"));
return "Trace";
}
}

之后启动sleuth-trace-2和sleuth-trace-1
通过postman访问http://localhost:7904/trace1
控制台分别显示信息:

1
2
[nio-7904-exec-4] tionOne$$EnhancerBySpringCGLIB$$a7e32e75 : ---------call sleuth-trace-1------------
[nio-7906-exec-1] tionTwo$$EnhancerBySpringCGLIB$$178e18f5 : ---------------call sleuth-trace-2 -------------------

以上准备工作完成后,来实现跟踪

实现跟踪

为sleuth-trace-2和sleuth-trace-1服务添加跟踪功能,通过Spring Cloud Sleuth,只需要在两个服务中增加依赖spring-cloud-starter-sleuth即可。

重启后,在访问,日志信息中就显示一些sleuth的信息,如

1
2
3
4
5
6
 [sleuth-trace-2,ced24759781b822f,b2c24565840b3207,false] 15288 --- [nio-7906-exec-9] ationTwo$$EnhancerBySpringCGLIB$$1b09342 : ---------------call sleuth-trace-2 -------------------

第一个值:sleuth-trace-2,它记录了应用的名称,也就是application.properties中spring.application.name参数配置的属性。
第二个值:ced24759781b822f,Spring Cloud Sleuth生成的一个ID,称为Trace ID,它用来标识一条请求链路。一条请求链路中包含一个Trace ID,多个Span ID。
第三个值:b2c24565840b3207,Spring Cloud Sleuth生成的另外一个ID,称为Span ID,它表示一个基本的工作单元,比如:发送一个HTTP请求。
第四个值:false,表示是否要将该信息输出到Zipkin等服务中来收集和展示。

上面四个值中的Trace ID和Span ID是Spring Cloud Sleuth实现分布式服务跟踪的核心。在一次服务请求链路的调用过程中,会保持并传递同一个Trace ID,从而将整个分布于不同微服务进程中的请求跟踪信息串联起来,以上面输出内容为例,sleuth-trace-1和sleuth-trace-2同属于一个前端服务请求来源,所以他们的Trace ID是相同的,处于同一条请求链路中。

跟踪原理

分布式系统中的服务跟踪在理论上并不复杂,它主要包括下面两个关键点:

  • 为了实现请求跟踪,当请求发送到分布式系统的入口端点时,只需要服务跟踪框架为该请求创建一个唯一的跟踪标识,同时在分布式系统内部流转的时候,框架始终保持传递该唯一标识,直到返回给请求方为止,这个唯一标识就是前文中提到的Trace ID。通过Trace ID的记录,我们就能将所有请求过程日志关联起来。
  • 为了统计各处理单元的时间延迟,当请求达到各个服务组件时,或是处理逻辑到达某个状态时,也通过一个唯一标识来标记它的开始、具体过程以及结束,该标识就是我们前文中提到的Span ID,对于每个Span来说,它必须有开始和结束两个节点,通过记录开始Span和结束Span的时间戳,就能统计出该Span的时间延迟,除了时间戳记录之外,它还可以包含一些其他元数据,比如:事件名称、请求信息等。

在项目中引入spring-cloud-starter-sleuth依赖之后, 它会自动的为当前应用构建起各通信通道的跟踪机制,比如:

  • 通过诸如RabbitMQ、Kafka(或者其他任何Spring Cloud Stream绑定器实现的消息中间件)传递的请求
  • 通过Zuul代理传递的请求
  • 通过RestTemplate发起的请求

之前的例子在发送到sleuth-trace-2之前会为在该请求的Header中增加实现跟踪需要的重要信息,主要有下面这几个(org.springframework.cloud.sleuth.Span的源码获取):

1
2
3
4
5
X-B3-TraceId:一条请求链路(Trace)的唯一标识,必须值
X-B3-SpanId:一个工作单元(Span)的唯一标识,必须值
X-B3-ParentSpanId::标识当前工作单元所属的上一个工作单元,Root Span(请求链路的第一个工作单元)的该值为空
X-B3-Sampled:是否被抽样输出的标志,1表示需要被输出,0表示不需要被输出
X-Span-Name:工作单元的名称

还可以在代码中获取这些信息:

1
2
3
4
5
6
@RequestMapping(value = "/trace2", method = RequestMethod.GET)
public String trace(HttpServletRequest request) {
logger.info("---------------call sleuth-trace-2 -------------------");
logger.info("----------------------"+ request.getHeader("X-B3-TraceId")+ request.getHeader("X-B3-SpanId"));
return "Trace";
}

再次运行:

1
2
---------------call sleuth-trace-2 -------------------
----------------------3f5b35a06e93292b4fac896c8cecede8
整合Logstash

ELK平台主要有由ElasticSearch、Logstash和Kiabana三个开源免费工具组成:

  • Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
  • Logstash是一个完全开源的工具,他可以对你的日志进行收集、过滤,并将其存储供以后使用。
  • Kibana 也是一个开源和免费的工具,它Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。

实现与负责日志收集的Logstash完成数据对接即可,所以我们需要为Logstash准备json格式的日志输出。由于Spring Boot应用默认使用了logback来记录日志,而Logstash自身也有对logback日志工具的支持工具,所以我们可以直接通过在logback的配置中增加对logstash的appender,就能非常方便的将日志转换成以json的格式存储和输出了。

在之前的基础上,实现面向logstash日志输出配置:在pom.xml依赖中引入logstash-logback-encoder依赖

在sleuth-trace-1创建配置文件,之前的配置,移到新创建的文件中。
这是因为:logback-spring.xml的加载在application.yml之前,所以之前的配置logback-spring.xml无法获取到spring.application.name属性,因此这里将该属性移动到最先加载的bootstrap配置文件中。

/resource目录下创建logback配置文件logback-spring.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<!--对logstash支持主要通过名为logstash的appender实现,主要是对日志信息的格式化处理,为了方便调试查看我们先将json日志输出到文件中。-->
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>

<springProperty scope="context" name="springAppName" source="spring.application.name"/>
<!-- 日志在工程中的输出位置 -->
<property name="LOG_FILE" value="${BUILD_FOLDER:-build}/${springAppName}"/>
<!-- 控制台的日志输出样式 -->
<property name="CONSOLE_LOG_PATTERN"
value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

<!-- 控制台Appender -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>

<!-- 为logstash输出的json格式的Appender -->
<appender name="logstash" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_FILE}.json</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE}.json.%d{yyyy-MM-dd}.gz</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"severity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="logstash"/>
</root>
</configuration>

运行,访问后,工程目录下发现有一个build目录,下面分别创建了以各自应用名称命名的json文件,该文件就是在logback-spring.xml中配置的名为logstash的appender输出的日志文件。
除了这种方式生成json文件之外,也可以使用LogstashTcpSocketAppender将日志内容直接通过Tcp Socket输出到logstash服务端。

1
2
3
4
<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>127.0.0.1:9250</destination>
...
</appender>
整合zipkin

利用ELK平台提供的收集、存储、搜索等强大功能,对跟踪信息的管理和使用已经变得非常便利。但是,在ELK平台中的数据分析维度缺少对请求链路中各阶段时间延迟的关注,如需要追溯请求链路找出整个调用链路中出现延迟过高的瓶颈源,亦或是为了实现对分布式系统做延迟监控等与时间消耗相关的需求,这时候类似ELK这样的日志分析系统就显得有些乏力。对于这样的问题,我们就可以引入Zipkin来得以轻松解决。

Zipkin是Twitter的一个开源项目,它基于Google Dapper实现。我们可以使用它来收集各个服务器上请求链路的跟踪数据,并通过它提供的REST API接口来辅助我们查询跟踪数据以实现对分布式系统的监控程序,从而及时地发现系统中出现的延迟升高问题并找出系统性能瓶颈的根源。除了面向开发的API接口之外,它也提供了方便的UI组件来帮助我们直观的搜索跟踪信息和分析请求链路明细,比如:可以查询某段时间内各用户请求的处理时间等。

Zipkin主要有4个核心组件构成:

  • Collector:收集器组件,它主要用于处理从外部系统发送过来的跟踪信息,将这些信息转换为Zipkin内部处理的Span格式,以支持后续的存储、分析、展示等功能。
  • Storage:存储组件,它主要对处理收集器接收到的跟踪信息,默认会将这些信息存储在内存中,我们也可以修改此存储策略,通过使用其他存储组件将跟踪信息存储到数据库中。
  • RESTful API:API组件,它主要用来提供外部访问接口。比如给客户端展示跟踪信息,或是外接系统访问以实现监控等。
  • Web UI:UI组件,基于API组件实现的上层应用。通过UI组件用户可以方便而有直观地查询和分析跟踪信息。
    HTTP收集
    搭建Zipkin Server
    创建一个基础的Spring Boot应用,命名为zipkin-server,并在pom.xml中引入Zipkin Server的相关依赖。
    依赖:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
    <artifactId>spring-cloud-sleuth</artifactId>
    <groupId>com.hu</groupId>
    <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>zipkin-server</artifactId>

    <dependencies>
    <dependency>
    <groupId>io.zipkin.java</groupId>
    <artifactId>zipkin-server</artifactId>
    </dependency>
    <dependency>
    <groupId>io.zipkin.java</groupId>
    <artifactId>zipkin-autoconfigure-ui</artifactId>
    </dependency>
    </dependencies>
    </project>
    配置:
    1
    2
    3
    4
    5
    spring:
    application:
    name: zipkin-server
    server:
    port: 4343
    启动类:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @EnableZipkinServer//启动Zipkin Server
    @SpringBootApplication
    public class ZipkinApplication {

    public static void main(String[] args) {
    SpringApplication.run(ZipkinApplication.class, args);
    }

    }
    启动项目,就可以浏览。

完成了Zipkin Server的搭建之后,对之前的应用实现将跟踪信息输出到Zipkin Server。

在之前的两个应用中,接入依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
</dependency>

配置文件中增加Zipkin 的地址spring.zipkin.base-url=http://localhost:3332

再次启动应用,收集的信息就会在Zipkin Server显示。

收集跟踪消息中间件

1、修改应用
Spring Cloud Sleuth在整合Zipkin时,不仅实现了以HTTP的方式收集跟踪信息,还实现了通过消息中间件来对跟踪信息进行异步收集的封装。通过结合Spring Cloud Stream,我们可以非常轻松的让应用客户端将跟踪信息输出到消息中间件上,同时Zipkin服务端从消息中间件上异步地消费这些跟踪信息。
为了让sleuth-trace-2和sleuth-trace-1在产生跟踪信息之后,能够将抽样记录输出到消息中间件中,我们除了需要之前引入的spring-cloud-starter-sleuth依赖之外,还需要引入zipkin对Spring Cloud Stream的扩展依赖spring-cloud-sleuth-stream以及基于Spring Cloud Stream实现的消息中间件绑定器依赖,以使用RabbitMQ为例,我们可以加入如下依赖:

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-stream</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

在配置中去掉HTTP方式实现时使用的spring.zipkin.base-url参数,并根据实际部署情况,增加消息中间件的相关配置。

2、修改zipkin-server服务端
让zipkin-server服务端能够从消息中间件中获取跟踪信息,我们只需要在pom.xml中引入针对消息中间件收集封装的服务端依赖spring-cloud-sleuth-zipkin-stream,同时为了支持具体使用的消息中间件,我们还需要引入针对消息中间件的绑定器实现,比如以使用RabbitMQ为例,我们可以在依赖中增加如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin-stream</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-ui</artifactId>
</dependency>

其中,spring-cloud-sleuth-zipkin-stream依赖是实现从消息中间件收集跟踪信息的核心封装,其中包含了用于整合消息中间件的核心依赖、zipkin服务端的核心依赖、以及一些其他通常会被使用的依赖(比如:用于扩展数据存储的依赖、用于支持测试的依赖等)。但是,需要注意的是这个包里并没有引入zipkin的前端依赖zipkin-autoconfigure-ui,为了方便使用,我们在这里也引用了它。

把项目都启动后,可以在RabbitMQ的控制页面中看到一个名为sleuth的交换器,它就是zipkin的消息中间件收集器实现使用的默认主题。
当发送请求时,有被抽样收集的跟踪信息,我们可以在RabbitMQ的控制页面中发现有消息被发送到了sleuth交换器中,同时我们再到zipkin服务端的Web页面中也能够搜索到相应的跟踪信息。

收集原理

暂停

Spring Cloud Stream

发表于 2018-08-14 | 分类于 springcloud

Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架,为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及消息分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。由于Spring Cloud Stream基于Spring Boot实现,所以它秉承了Spring Boot的优点,实现了自动化配置的功能帮忙我们可以快速的上手使用,但是目前为止Spring Cloud Stream只支持RabbitMQ和Kafka

一个简单的demo
创建项目:spring-cloud-stream-demo,引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-stream</artifactId>
<groupId>com.hu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>spring-cloud-stream-demo</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

配置文件:

1
2
3
4
5
6
7
8
9
server:
port: 8989

spring:
rabbitmq:
host: 192.168.1.10
port: 5672
username: guest
password: guest

消息消费者类:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 用于接收来自RabbitMQ消息的消费者
*/
@EnableBinding(Sink.class)
public class ConsumerDemo {
private static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);

//输入通道
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
logger.info("Received: " + payload);
}
}

应用启动之后,观察日志,有:

1
2
3
4
main] o.s.i.codec.kryo.CompositeKryoRegistrar  : configured Kryo registration [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
main] o.s.integration.channel.DirectChannel : Channel 'application:8989.input' has 1 subscriber(s).
main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
[delegate=amqp://guest@192.168.1.10:5672/, localPort= 54920]

得知 ,系统guest用户创建了一个指向192.168.1.10:5672位置的RabbitMQ连接,在RabbitMQ的控制台中我们也可以发现它。

在控制台管理器中也可以看到。
声明了一个名为input.anonymous.bSbkR9fXS8utJPlEn2Cidw的队列,并通过RabbitMessageChannelBinder将自己绑定为它的消费者。

进入input.anonymous.bSbkR9fXS8utJPlEn2Cidw 通过Publish Message功能来发送一条消息到该队列中。

结果可以在当前启动的Spring Boot应用程序的控制台中看到:
[8utJPlEn2Cidw-1] com.hu.mq.ConsumerDemo : Received: [B@3b5ba131
输出的内容就是ConsumerDemo中receive方法定义的,输出的内容就是在控制台输入的信息。由于没有对消息进行序列化,所以看到的就对象的引用。

这是怎么通过消费消息以实现消息驱动业务逻辑的,解释如下:

首先,在应用做的就是引入spring-cloud-starter-stream-rabbit依赖,包是Spring Cloud Stream对RabbitMQ支持的封装,其中包含了对RabbitMQ的自动化配置等内容,等价于spring-cloud-stream-binder-rabbit依赖。

这里用到的几个Spring Cloud Stream的核心注解,它们都被定义在SinkReceiver中:

@EnableBinding,该注解用来指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定。在上面的例子中,我们通过@EnableBinding(Sink.class)绑定了Sink接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义,它的源码如下:

1
2
3
4
5
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}

它通过@Input注解绑定了一个名为input的通道。除了Sink之外,Spring Cloud Stream还默认实现了绑定output通道的Source接口,还有结合了Sink和Source的Processor接口,实际使用时我们也可以自己通过@Input和@Output注解来定义绑定消息通道的接口。当我们需要为@EnableBinding指定多个接口来绑定消息通道的时候,可以这样定义:@EnableBinding(value = {Sink.class, Source.class})。

@StreamListener:该注解主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。在上面的例子中,我们通过@StreamListener(Sink.INPUT)注解将receive方法注册为对input消息通道的监听处理器,所以当我们在RabbitMQ的控制页面中发布消息的时候,receive方法会做出对应的响应动作。

之前是控制台完成了发送消息来验证了消息消费程序的功能,现在做一个单元测试方式发送消息。

创建单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 将@EnableBinding应用到spring应用的一个配置类中,可以将spring应用变成Spring Cloud Stream应用。
*
* 注解@EnableBinding本身就包含@Configuration注解,并且会触发Spring Cloud Stream 基本配置。
* 可以接收一个或多个接口类作为对象,后者包含代表了可绑定构件(一般来说是消息通道)的方法。
*/
@RunWith(SpringRunner.class)
@EnableBinding(value = {SinkApplicationTests.SinkSender.class})
public class SinkApplicationTests {

@Autowired
private SinkSender sinkSender;

@Test
public void sinkSenderTester() {
sinkSender.output().send(MessageBuilder.withPayload("---------------hello-------------").build());
}

public interface SinkSender {
//定义了一个输出通道,与ConsumerDemo中定义的消费通道同名,这样与之前的消费者程序组成了一对生产者与消费者。
@Output(Sink.INPUT)
MessageChannel output();
}
}

运行后,消费端就输出结果了

1
[8W8kbDiQJj4QA-1] com.hu.mq.ConsumerDemo                   : Received: ---------------hello-------------
Spring Cloud Stream应用模型的结构图

Spring Cloud Stream构建的应用程序与消息中间件之间是通过绑定器 Binder相关联的,绑定器对于应用程序而言起到了隔离作用,它使得不同消息中间件的实现细节对应用程序来说是透明的。所以对于每一个Spring Cloud Stream的应用程序来说,它不需要知晓消息中间件的通信细节,它只需要知道 Binder对应用程序提供的概念去实现即可,而这个概念就是在快速入门中我们提到的消息通道: Channel

绑定器

Binder绑定器是Spring Cloud Stream中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。当我们需要升级消息中间件,或是更换其他消息中间件产品时,我们要做的就是更换它们对应的Binder绑定器而不需要修改任何Spring Boot的应用逻辑。这一点在上一章实现消息总线时,从RabbitMQ切换到Kafka的过程中,已经能够让我们体验到这一好处。

发布-订阅模式

在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的Topic主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的Topic主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中,Topic可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic。

在之前的而例子,通过RabbitMQ的Channel进行发布消息给我们编写的应用程序消费,实际上Spring Cloud Stream应用启动的时候,在RabbitMQ的Exchange中也创建了一个名为input的Exchange交换器,由于Binder的隔离作用,应用程序并无法感知它的存在,应用程序只知道自己指向Binder的输入或是输出通道。

在发布-订阅模式中,消息被分发到多个订阅者,例如,把之前的例子,通过命令行的方式启动两个不同端口的进程。此时,在RabbitMQ控制页面的Channels标签页中看到如下图所示的两个消息通道,它们分别绑定了启动的两个应用程序。
把项目install,生成jar,进入目录,在控制台通过命令启动另外一个实例,指定端口。
java -jar spring-cloud-stream-demo-1.0-SNAPSHOT.jar -server.port 2345

而在Exchanges标签页中,我们还能找到名为input的交换器,点击进入可以看到如下图所示的详情页面,其中在Bindings中的内容就是两个应用程序绑定通道中的消息队列,我们可以通过Exchange页面的Publish Message来发布消息,此时可以发现两个启动的应用程序都输出了消息内容。

相对于点对点队列实现的消息通信来说,Spring Cloud Stream采用的发布-订阅模式可以有效的降低消息生产者与消费者之间的耦合,当我们需要对同一类消息增加一种处理方式时,只需要增加一个应用程序并将输入通道绑定到既有的Topic中就可以实现功能的扩展,而不需要改变原来已经实现的任何内容。

消费组

虽然Spring Cloud Stream通过发布-订阅模式将消息生产者与消费者做了很好的解耦,基于相同主题的消费者可以轻松的进行扩展,但是这些扩展都是针对不同的应用实例而言的,在现实的微服务架构中,我们每一个微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例。

消息生产者发送消息给某个具体微服务时,只希望被消费一次,按照上面我们启动两个应用的例子,虽然它们同属一个应用,但是这个消息出现了被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念。

如果在同一个主题上的应用需要启动多个实例的时候,我们可以通过spring.cloud.stream.bindings.input.group属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候,只会有一个成员真正的收到消息并进行处理。

默认情况下,Spring Cloud Stream会为其分配一个独立的匿名消费组。所以,如果同一主题下所有的应用都没有指定消费组的时候,当有消息被发布之后,所有的应用都会对其进行消费,因为它们各自都属于一个独立的组中。大部分情况下,我们在创建Spring Cloud Stream应用的时候,建议最好为其指定一个消费组,以防止对消息的重复处理,除非该行为需要这样做(比如:刷新所有实例的配置等)。

使用消费组实现消息消费的负载均衡
先创建一个消费者应用,实现了greetings主题上的输入通道绑定;
spring.cloud.stream.bindings.input.group=Service-A #指定了该应用实例都属于Service-A消费组
spring.cloud.stream.bindings.input.destination=greetings #指定了输入通道对应的主题名
完成了消息消费者之后,我们再来实现一个消息生产者应用;
它的输出通道绑定目标也指向greetings主题
spring.cloud.stream.bindings.output.destination=greetings

消息分区

通过引入消费组的概念,已经能够在多实例的情况下,保障每个消息只被组内一个实例进行消费。
通过上面对消费组参数设置后的实验,消费组并无法控制消息具体被哪个实例消费。也就是说,对于同一条消息,它多次到达之后可能是由不同的实例进行消费的。但是对于一些业务场景,就需要对于一些具有相同特征的消息每次都可以被同一个消费实例处理,比如:一些用于监控服务,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身内容聚合这些数据,那么消息生产者可以为消息增加一个固有的特征ID来进行分区,使得拥有这些ID的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。而分区概念的引入就是为了解决这样的问题:当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。

Spring Cloud Stream为分区提供了通用的抽象实现,用来在消息中间件的上层实现分区处理,所以它对于消息中间件自身是否实现了消息分区并不关心,这使得Spring Cloud Stream为不具备分区功能的消息中间件也增加了分区功能扩展。

在消费者应用,增加配置:

1
2
3
spring.cloud.stream.bindings.input.consumer.partitioned:通过该参数开启消费者分区功能;
spring.cloud.stream.instanceCount:该参数指定了当前消费者的总实例数量;
spring.cloud.stream.instanceIndex:该参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount参数 - 1。我们试验的时候需要启动多个实例,可以通过运行参数来为不同实例设置不同的索引值。

在生产者应用,增加配置:

1
2
spring.cloud.stream.bindings.output.producer.partitionKeyExpression:通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
spring.cloud.stream.bindings.output.producer.partitionCount:该参数指定了消息分区的数量。

消息分区配置完成,启动生产者,同时消费者启动多个,注意为消费者指定不同的实例索引号,这样当同一个消息被发给消费组时,我们可以发现只有一个消费实例在接收和处理这些相同的消息。

Spring Cloud Eureka

发表于 2018-08-14 | 分类于 springcloud

Spring Cloud Eureka是Spring Cloud Netflix项目下的服务治理模块。而Spring Cloud Netflix项目是Spring Cloud的子项目之一,主要内容是对Netflix公司一系列开源产品的包装,它为Spring Boot应用提供了自配置的Netflix OSS整合。通过一些简单的注解,开发者就可以快速的在应用中配置一下常用模块并构建庞大的分布式系统。它主要提供的模块包括:服务发现(Eureka),断路器(Hystrix),智能路由(Zuul),客户端负载均衡(Ribbon)等。
创建一个项目spring-cloud-eureka
引入子模块需要的依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hu</groupId>
<artifactId>spring-cloud-eureka</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>spring-cloud-eureka-server</module>
<module>spring-cloud-eureka-provider</module>
<module>spring-cloud-eureka-consumer</module>
<module>spring-cloud-eureka-consumer-feign</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath/>
</parent>
<!-- 设置jdk版本等信息 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<!-- dependency management -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

</project>
创建服务注册中心

在Idea中创建一个Eureka服务子模块,命名为spring-cloud-eureka-server,并在pom.xml中引入需要的依赖内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-eureka</artifactId>
<groupId>com.hu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>spring-cloud-eureka-server</artifactId>

<!-- 导入eureka依赖 -->
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>

</dependencies>
</project>

创建 启动类:

1
2
3
4
5
6
7
8
@EnableEurekaServer//表示是一个eurekaserver
@SpringBootApplication
public class EurekaServerApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(EurekaServerApplication.class).web(true).run(args);
}
}

增加配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
security:
basic:
enabled: true
user:
name: user
password: 123456
server:
port: 8761
eureka:
instance:
hostname: localhost
client:
register-with-eureka: false
fetch-registry: false
service-url:
defaultZone: http://user:123456@localhost:8761/eureka
logging:
level:
创建服务提供方

创建一个服务spring-cloud-eureka-provider,引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-eureka</artifactId>
<groupId>com.hu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>spring-cloud-eureka-provider</artifactId>
<dependencies>
<!--boot eureka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!-- boot web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>

</project>

创建启动类:

1
2
3
4
5
6
7
8
@EnableEurekaClient
@SpringBootApplication
public class ProviderApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(ProviderApplication.class).web(true).run(args);
}
}

创建配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 7900
spring:
application:
name: provider-hello
eureka:
client:
serviceUrl:
defaultZone: http://user:123456@localhost:8761/eureka
logging:
level:
root: INFO
#通过spring.application.name属性,我们可以指定微服务的名称后续在调用的时候只需要使用该名称就可以进行服务的访问。
#eureka.client.serviceUrl.defaultZone属性对应服务注册中心的配置内容,指定服务注册中心的位置。
#为了在本机上测试区分服务提供方和服务注册中心,使用server.port属性设置不同的端口。

创建服务 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
public class ProviderController {
private static String[] users = {"5","2","3","1"};
@RequestMapping(value = "/users",method = RequestMethod.GET)
public String[] users(){
return users;
}
@RequestMapping(value = "/users/{id}",method = RequestMethod.GET)
public String users(@PathVariable String id){
for (int i=0;i<users.length;i++) {
if (id!=null&&id.equals(users[i])) {
return users[i];
}
}
return null;
}
}

启动项目后,浏览器查看注册中心,可以看到服务已经被注册到注册中心。
同样方式创建服务的消费者

创建服务消费方

创建子模块spring-cloud-eureka-consumer,引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-eureka</artifactId>
<groupId>com.hu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>spring-cloud-eureka-consumer</artifactId>

<dependencies>
<!--boot eureka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!-- boot web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>

创建配置:

1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 7902
spring:
application:
name: consumer-hello
eureka:
client:
serviceUrl:
defaultZone: http://user:123456@localhost:8761/eureka
logging:
level:
root: INFO

创建启动类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 创建应用主类
* 初始化RestTemplate,用来真正发起REST请求。
* 注解@EnableDiscoveryClient注解用来将当前应用加入到服务治理体系中。
*/
@EnableEurekaClient
@SpringBootApplication
public class ConsumerApplication {
/**
* 通过注解 @LoadBalanced 使用Ribbon负载均衡
* Spring Cloud Ribbon是基于Netflix Ribbon实现的一套客户端负载均衡的工具。
* 这样就可以在Controller中,去掉通过LoadBalancerClient选取实例和拼接URL的步骤,直接通过RestTemplate发起请求。
* 请求的host位置并没有使用一个具体的IP地址和端口的形式,而是采用了服务名的方式组成,不需要具体的IP和端口。因为:
* Spring Cloud Ribbon有一个拦截器,它能够在这里进行实际调用的时候,自动的去选取服务实例,并将实际要请求的IP地址和端口替换这里的服务名,从而完成服务接口的调用。
* @return
*/
@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
/*开启多个provider实例,
可以采用修改yml文件 中的端口号重复启动实例,也可以另外新建多个provider,修改其中的yml文件,依次启动。
由于eureka配置类ribbon负载均衡策略,消费者的请求会在客户端被决定好发送到哪台服务提供者进行处理。*/
}

创建控制器,控制器中调用了注册中心的提供者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
public class ConsumerController {
@Autowired
private RestTemplate resttemplate;
@Autowired
LoadBalancerClient loadBalancerClient;

@RequestMapping(value = "/users",method = RequestMethod.GET)
public String[] users(){
//不使用负载
//ServiceInstance serviceInstance = loadBalancerClient.choose("provider-hello");
//String url = "http://" + serviceInstance.getHost() + ":" + serviceInstance.getPort() + "/users";

//使用负载
String url="http://provider-hello/users";

//返回值类型和我们的业务返回值一致
return resttemplate.getForObject(url, String[].class);
}
@RequestMapping(value = "/users/{id}",method = RequestMethod.GET)
public String users(@PathVariable String id){
System.out.println("-----------------------"+id);
String url="http://provider-hello/users/"+id;
return resttemplate.getForObject(url, String.class);
}
}

运行,浏览器访问控制器,获取了服务提供者的信息。

Spring Cloud Consul

Spring Cloud Consul项目是针对Consul的服务治理实现。Consul是一个分布式高可用的系统,它包含多个组件,包含了下面几个特性:

  • 服务发现
  • 健康检查
  • Key/Value存储
  • 多数据中心

基于Spring Boot的微服务应用注册到Consul上,实现微服务架构中的服务治理。
以之前实现的基于Eureka的示例为基础,只需要在pom.xml中将eureka的依赖修改为如下依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>

再修改配置文件:

1
2
3
4
5
spring: 
cloud:
consul:
host: localhost
port: 8500

这样就把基于Eureka的服务换为基于consul服务治理的服务提供者;
Spring cloud服务发现的接口DiscoveryClient是Spring Cloud对服务治理做的一层抽象,所以可以屏蔽Eureka和Consul服务治理的实现细节,只需要引入不同的服务治理依赖,并配置相关的配置属性即可。
consul是服务端不需要像Eureka那种创建,直接下载consul的服务端程序就可以使用。

Spring Cloud Ribbon

Spring Cloud Ribbon是基于Netflix Ribbon实现的一套客户端负载均衡的工具。它是一个基于HTTP和TCP的客户端负载均衡器。它可以通过在客户端中配置ribbonServerList来设置服务端列表去轮询访问以达到均衡负载的作用。

  • 当Ribbon与Eureka联合使用时,ribbonServerList会被DiscoveryEnabledNIWSServerList重写,扩展成从Eureka注册中心中获取服务实例列表。同时它也会用NIWSDiscoveryPing来取代IPing,它将职责委托给Eureka来确定服务端是否已经启动。
  • 当Ribbon与Consul联合使用时,ribbonServerList会被ConsulServerList来扩展成从Consul获取服务实例列表。同时由ConsulPing来作为IPing接口的实现。

不论是与Eureka还是Consul结合,都可以快速在Spring Cloud中实现服务间调用的负载均衡。
在之前的模块中,引入依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-ribbon</artifactId>
</dependency>

修改类增加@LoadBalanced注解(其实前面已经使用了):

1
2
3
4
5
@Bean
@LoadBalanced
public RestTemplate restTemplate(){
return new RestTemplate();
}

去掉通过LoadBalancerClient选取实例和拼接URL的步骤,直接通过RestTemplate发起请求。

1
return resttemplate.getForObject(url, String[].class);

这样就实现了一个简单的Ribbon负载均衡。

Spring Cloud Feign

Spring Cloud Feign是一套基于Netflix Feign实现的声明式服务调用客户端。它使得编写Web服务客户端变得更加简单。我们只需要通过创建接口并用注解来配置它既可完成对Web服务接口的绑定。它具备可插拔的注解支持,包括Feign注解、JAX-RS注解。它也支持可插拔的编码器和解码器。Spring Cloud Feign还扩展了对Spring MVC注解的支持,同时还整合了Ribbon和Eureka来提供均衡负载的HTTP客户端实现。
新建一个模块spring-cloud-eureka-consumer-feign,引入依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
</dependency>

启动类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Spring Cloud Feign是一套基于Netflix Feign实现的声明式服务调用客户端。它使得编写Web服务客户端变得更加简单。
* 我们只需要通过创建接口并用注解来配置它既可完成对Web服务接口的绑定。
* 它具备可插拔的注解支持,包括Feign注解、JAX-RS注解。它也支持可插拔的编码器和解码器。
*
* Spring Cloud Feign还扩展了对Spring MVC注解的支持,同时还整合了Ribbon和Eureka来提供均衡负载的HTTP客户端实现。
*
*/
@EnableEurekaClient
@SpringBootApplication
@EnableFeignClients //修改应用主类。通过@EnableFeignClients注解开启扫描Spring Cloud Feign客户端的功能
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}

创建一个Feign的客户端接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 创建一个Feign的客户端接口定义。
* 使用@FeignClient注解来指定这个接口所要调用的服务名称,
*
* 接口中定义的各个函数使用Spring MVC的注解就可以来绑定服务提供方的REST接口
*/
@Service
@FeignClient("provider-hello")
public interface HelloFeignClient {
@RequestMapping(value = "/users",method = RequestMethod.GET)
String[] users();
@RequestMapping(value = "/users/{id}",method = RequestMethod.GET)
String users(@PathVariable("id") String id);
}

控制器直接通过接口去掉服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 通过Spring Cloud Feign来实现服务调用的方式更加简单了,通过@FeignClient定义的接口来统一的生命我们需要依赖的微服务接口。
* 而在具体使用的时候就跟调用本地方法一点的进行调用即可。
*
* 由于Feign是基于Ribbon实现的,所以它自带了客户端负载均衡功能,也可以通过Ribbon的IRule进行策略扩展。
* 另外,Feign还整合的Hystrix来实现服务的容错保护,在Dalston版本中,Feign的Hystrix默认是关闭的。
*/
@RestController
public class ConsumerController {
@Autowired
HelloFeignClient helloFeignClient;

@RequestMapping(value = "/users",method = RequestMethod.GET)
public String[] users(){
return helloFeignClient.users();
}
@RequestMapping(value = "/users/{id}",method = RequestMethod.GET)
public String users(@PathVariable("id") String id){
return helloFeignClient.users(id);
}
}

这样就完成了简单的整合调用。

springcloud介绍

发表于 2018-08-14 | 分类于 springcloud

Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的开发便利性简化了分布式系统的开发,比如服务发现、服务网关、服务路由、链路追踪等。Spring Cloud 并不重复造轮子,而是将市面上开发得比较好的模块集成进去,进行封装,从而减少了各模块的开发成本。换句话说:Spring Cloud 提供了构建分布式系统所需的”全家桶”。

其主要优点有:

  • 集大成者,Spring Cloud 包含了微服务架构的方方面面。
  • 约定优于配置,基于注解,没有配置文件。
  • 轻量级组件,Spring Cloud 整合的组件大多比较轻量级,且都是各自领域的佼佼者。
  • 开发简便,Spring Cloud 对各个组件进行了大量的封装,从而简化了开发。
  • 开发灵活,Spring Cloud 的组件都是解耦的,开发人员可以灵活按需选择组件。

缺点:

  • 项目结构复杂,每一个组件或者每一个服务都需要创建一个项目。
  • 部署门槛高,项目部署需要配合 Docker 等容器技术进行集群部署,而要想深入了解 Docker,学习成本高。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1、Spring Cloud Config 配置中心,利用git集中管理程序的配置。 

2、Spring Cloud Netflix 集成众多Netflix的开源软件
3、Spring Cloud Bus 消息总线,利用分布式消息将服务和服务实例连接在一起,用于在一个集群中传播状态的变化
4、Spring Cloud for Cloud Foundry 利用Pivotal Cloudfoundry集成你的应用程序
5、Spring Cloud Cloud Foundry Service Broker 为建立管理云托管服务的服务代理提供了一个起点。
6、Spring Cloud Cluster 基于Zookeeper, Redis, Hazelcast, Consul实现的领导选举和平民状态模式的抽象和实现。
7、Spring Cloud Consul 基于Hashicorp Consul实现的服务发现和配置管理。
8、Spring Cloud Security 在Zuul代理中为OAuth2 rest客户端和认证头转发提供负载均衡
9、Spring Cloud Sleuth SpringCloud应用的分布式追踪系统,和Zipkin,HTrace,ELK兼容。
10、Spring Cloud Data Flow 一个云本地程序和操作模型,组成数据微服务在一个结构化的平台上。
11、Spring Cloud Stream 基于Redis,Rabbit,Kafka实现的消息微服务,简单声明模型用以在Spring Cloud应用中收发消息。
12、Spring Cloud Stream App Starters 基于Spring Boot为外部系统提供spring的集成
13、Spring Cloud Task 短生命周期的微服务,为SpringBooot应用简单声明添加功能和非功能特性。
14、Spring Cloud Task App Starters
15、Spring Cloud Zookeeper 服务发现和配置管理基于Apache Zookeeper。
16、Spring Cloud for Amazon Web Services 快速和亚马逊网络服务集成。
17、Spring Cloud Connectors 便于PaaS应用在各种平台上连接到后端像数据库和消息经纪服务。
18、Spring Cloud Starters (项目已经终止并且在Angel.SR2后的版本和其他项目合并)
19、Spring Cloud CLI 插件用Groovy快速的创建Spring Cloud组件应用。
上一页1…121314…25下一页
初晨

初晨

永远不要说你知道本质,更别说真相了。

249 日志
46 分类
109 标签
近期文章
  • WebSocket、Socket、TCP、HTTP区别
  • Springboot项目的接口防刷
  • 深入理解Volatile关键字及其实现原理
  • 使用vscode搭建个人笔记环境
  • HBase介绍安装与操作
© 2018 — 2020 Copyright
由 Hexo 强力驱动
|
主题 — NexT.Gemini v5.1.4