简

人生短暂,学海无边,而大道至简。


  • 首页

  • 归档

  • 分类

  • 标签

NodeJS安装配置及Express基本介绍

发表于 2019-04-01 | 分类于 docker

Node.js简介

简单的说 Node.js 就是运行在服务端的 JavaScript。Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境。Node.js 使用了一个事件驱动、非阻塞式 I/O 的模型,使其轻量又高效。Node.js 的包管理器 npm,是全球最大的开源库生态系统。

1
2
3
安装环境
本机系统:Windows 7
NODEJS版本:node-v10.14.0-x64

步骤

1
2
3
4
1、下载对应你系统的Node.js版本:https://nodejs.org/en/download/
2、选安装目录进行安装
3、环境配置
4、测试

下载

安装

Windows上安装时务必选择全部组件,包括勾选Add to Path,更改自己需要的安装目录。

安装完成后安装目录有nodejs的文件:

测试安装是否成功了:【win+R】键,输入cmd,然后回车,打开cmd窗口。

分别输入:

1
2
node -v
npm -v

显示出版本信息,说明安装成功!

NPM

npm是什么?

npm是Node.js的包管理工具(package manager)。npm的作用就是对Node.js依赖的包进行管理,也可以理解为用来安装/卸载Node.js需要装的东西,在Node.js上开发时,会用到很多别人写的JavaScript代码包。如果我们要使用别人的包,每次都要去搜索——下载——解压——使用,这样非常麻烦。于是出现了这个集中管理的工具,大家都把自己开发的模块打包后放到npm官网上,如果别人需要,直接通过npm安装就可以使用了。
npm还可以根据包的依赖关系,把所有依赖的包都下载并管理起来。新版的Node.js已自带npm,安装Node.js时会一起安装。

配置

主要配置的是npm安装的全局模块所在的路径,以及缓存cache的路径。是因为以后在执行类似:npm install express [-g](后面的可选参数-g,g代表global全局安装的意思)的安装语句时,会将安装的模块安装到【C:\Users\用户名\AppData\Roaming\npm】默认路径中。

将全模块所在路径和缓存路径放在我node.js安装的文件夹中,再安装目录下创建两个文件夹【node_global】及【node_cache】:

创建完两个空文件夹之后,打开cmd命令窗口,输入:

1
2
npm config set prefix "XXXXXX\nodejs\node_global
npm config set cache "XXXXX\nodejs\node_cache

XXXXX是你nodejs文件路径

设置环境变量

  window7等老版本windows的编辑环境变量的方式,很麻烦,一不小心,漏了个分号,都会弄得自己怀疑人生。

  下载了 Rapid Environment Editor 这个小工具,就可以很方便的在window7及以下版本编辑环境变量了。

  下载地址:https://www.rapidee.com/en/download 按计算机是32还是64位下载对应版本

打开软件

【系统变量】下右键新建一个变量【NODE_PATH】,值为C:\AdministratorProgramFiles\nodejs\node_modules

【用户变量】下的【Path】中的这一项C:\Users\用户名\AppData\Roaming\npm 修改为C:\AdministratorProgramFiles\nodejs\node_global

配置完后,安装个module测试下,我们就安装最常用的express模块,打开cmd窗口,
输入如下命令进行模块的全局安装:

npm install express -g# -g是全局安装的意思

安装完成后可以看到C:\AdministratorProgramFiles\nodejs\node_global\node_modules下有express模块

测试

此时,nodejs安装环境配置完毕,由于nodejs是运行在服务端的,所以编写的JavaScript代码将不能在浏览器环境中执行了,而是在Node环境中执行。

因此,JavaScript代码将直接在你的计算机上以命令行的方式运行,所以,我们要先选择一个文本编辑器来编写JavaScript代码,并且把它保存到本地硬盘的某个目录,才能够执行。

 用电脑上安装的编辑器编写代码测试。我用Notepad++,注意用UTF-8格式保存。

输入以下代码:

1
console.log('Hello, world!');

第一行写上'use strict',是因为我们总是以严格模式运行JavaScript代码,避免各种潜在陷阱。

然后,选择一个目录保存为helloworld.js,必须要以.js结尾,就可以打开命令行窗口,把当前目录切换到helloworld.js所在目录,运行这个程序:

1
2
G:\NodeJs node helloworld.js
Hello, world!

至此,NodeJS下载安装配置环境,测试都完成。

Express

Express是基于Nodejs的官方Web开发库,每年升级一个大版本,在Express4时,替换掉中件间库connect,而改用多个更细粒度的库来取代。
首先,我们需要安装express库。在Express3.6.x之前的版本,Express需要全局安装的,项目构建器模块是合并在Express项目中的,后来这个构建器被拆分出来,独立成为了一个项目express-generator,现在我们只需要全局安装express-generator项目就行了。

1
npm install -g express-generator@4  #全局安装-g

安装后可以看到全局模块:

IDE工程结构

使用WebStorm创建一个基于Express项目:

目录结构:

1
2
3
4
5
6
7
bin, 存放启动项目的脚本文件
node_modules, 存放所有的项目依赖库。
public,静态文件(css,js,img)
routes,路由文件(MVC中的C,controller)
views,页面文件(Ejs模板)
package.json,项目依赖配置及开发者信息
app.js,应用核心配置文件

package.json项目配置

package.json用于项目依赖配置及开发者信息,scripts属性是用于定义操作命令的,可以非常方便的增加启动命令,比如默认的start,用npm start代表执行node ./bin/www命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"name": "express-study",
"version": "0.0.0",
"private": true,
"scripts": {
"start": "node ./bin/www"
},
"dependencies": {
"cookie-parser": "~1.4.4",
"debug": "~2.6.9",
"express": "~4.16.1",
"http-errors": "~1.6.3",
"morgan": "~1.9.1",
"pug": "2.0.0-beta11"
}
}

app.js核心文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 加载依赖库,原来这个类库都封装在connect中,现在需地注单独加载
var createError = require('http-errors');
var express = require('express');
var path = require('path');
var cookieParser = require('cookie-parser');
var logger = require('morgan');

// 加载路由控制
var indexRouter = require('./routes/index');
var usersRouter = require('./routes/users');

// 创建项目实例
var app = express();

// 定义pug模板引擎和模板文件位置,也可以使用ejs或其他模型引擎
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'pug');

// 定义日志和输出级别
app.use(logger('dev'));
// 定义数据解析器
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
// 定义cookie解析器
app.use(cookieParser());
// 定义静态文件目录
app.use(express.static(path.join(__dirname, 'public')));

// 匹配路径和路由
app.use('/', indexRouter);
app.use('/users', usersRouter);

// catch 404 and forward to error handler
app.use(function(req, res, next) {
next(createError(404));
});


// error handler
app.use(function(err, req, res, next) {
// set locals, only providing error in development
res.locals.message = err.message;
res.locals.error = req.app.get('env') === 'development' ? err : {};

// render the error page
res.status(err.status || 500);
res.render('error');
});

// 输出模型app
module.exports = app;

www文件也是一个node的脚本,用于分离配置和启动程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env node

/**
* Module dependencies.
* 加载依赖
*/

var app = require('../app');
var debug = require('debug')('express-study:server');
var http = require('http');

/**
* Get port from environment and store in Express.
* 定义启动端口
*/

var port = normalizePort(process.env.PORT || '3000');
app.set('port', port);

/**
* Create HTTP server.
* 创建HTTP服务器实例
*/

var server = http.createServer(app);

/**
* Listen on provided port, on all network interfaces.
* 启动网络服务监听端口
*/

server.listen(port);
server.on('error', onError);
server.on('listening', onListening);

/**
* Normalize a port into a number, string, or false.
* 端口标准化函数
*/

function normalizePort(val) {
var port = parseInt(val, 10);

if (isNaN(port)) {
// named pipe
return val;
}

if (port >= 0) {
// port number
return port;
}

return false;
}

/**
* Event listener for HTTP server "error" event.
* HTTP异常事件处理函数
*/

function onError(error) {
if (error.syscall !== 'listen') {
throw error;
}

var bind = typeof port === 'string'
? 'Pipe ' + port
: 'Port ' + port;

// handle specific listen errors with friendly messages
switch (error.code) {
case 'EACCES':
console.error(bind + ' requires elevated privileges');
process.exit(1);
break;
case 'EADDRINUSE':
console.error(bind + ' is already in use');
process.exit(1);
break;
default:
throw error;
}
}

/**
* Event listener for HTTP server "listening" event.
* 事件绑定函数
*/

function onListening() {
var addr = server.address();
var bind = typeof addr === 'string'
? 'pipe ' + addr
: 'port ' + addr.port;
debug('Listening on ' + bind);
}

运行浏览:

接下来,我们把index.ejs页面切分成3个部分:header.ejs, index.ejs, footer.ejs,用于网站页面的模块化。

header.ejs, 为页面的头部区域

1
2
3
4
5
6
7
8
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<title><%= title %></title>
<link rel="stylesheet" href="http://cdn.bootcss.com/bootstrap/3.3.2/css/bootstrap.min.css">
<link rel='stylesheet' href='/stylesheets/style.css' />
</head>
<body>

index.ejs, 为内容显示区域

1
2
3
4
5
6
7
8
<% include header.ejs %>

<div class="well jumbotron">
<h1><%= title %></h1>
<p>hello</p>
</div>

<% include footer.ejs %>

footer.ejs,为页面底部区域

1
2
3
4
<script src="http://cdn.bootcss.com/jquery/1.11.2/jquery.min.js"></script>
<script src="http://cdn.bootcss.com/bootstrap/3.3.2/js/bootstrap.min.js"></script>
</body>
</html>

路由功能

路由功能,是Express4以后全面改版的功能。在应用程序加载隐含路由中间件.

1
2
app.route()函数,创建可链接的途径处理程序的路由路径。
express.Router类,创建模块化安装路径的处理程序。

app.route方法会返回一个Route实例,它可以继续使用所有的HTTP方法,包括get,post,all,put,delete,head等。

1
2
3
app.route('/users')
.get(function(req, res, next) {})
.post(function(req, res, next) {})

express.Router类,则可以帮助我们更好的组织代码结构。
在app.js文件中,定义了app.use(‘/’, routes); routes是指向了routes目录下的index.js文件,./routes/index.js文件中,express.Router被定义使用,路径/*处理都会由routes/index.js文件里的Router来处理。如果我们要管理不同的路径,那么可以直接配置为多个不同的Router。

app.use(‘/user’, require(‘./routes/user’).user);
app.use(‘/admin’, require(‘./routes/admin’).admin);
app.use(‘/‘, require(‘./routes’));

使用消息机制补偿处理实例

发表于 2019-03-23 | 分类于 分布式

分布式环境用,可以使用补偿机制实现解决一些问题,如通过下面发送邮件例子:

创建消息生产者

创建工程,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hu</groupId>
<artifactId>descout-buchang-producer</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.32</version>
</dependency>
</dependencies>
</project>

配置:

1
2
3
4
5
6
spring:
rabbitmq:
host: 192.168.6.238
port: 5672
username: guest
password: guest
1
2
3
4
5
6
@SpringBootApplication
public class ProcucerApplication {
public static void main(String[] args){
SpringApplication.run(ProcucerApplication.class,args);
}
}

rabbitmq配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Component
public class FanoutConfig {

// 邮件队列
private String FANOUT_EMAIL_QUEUE = "email_queue";

// 短信队列
private String FANOUT_SMS_QUEUE = "sms_queue";

// fanout 交换机
private String EXCHANGE_NAME = "fanoutExchange";

// 1.定义邮件队列
@Bean
public Queue fanOutEamilQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}

// 2.定义短信队列
@Bean
public Queue fanOutSmsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}

// 2.定义交换机
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}

// 3.队列与交换机绑定邮件队列
@Bean
Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}

// 4.队列与交换机绑定短信队列
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}
}

发消息到消息队列,说明发邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;

public void send(String msg) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "404914989@qq.com");
jsonObject.put("msg",msg);
jsonObject.put("Time", System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
System.out.println("jsonString:" + jsonString);
amqpTemplate.convertAndSend("email_queue", jsonString);
}
}

控制器

1
2
3
4
5
6
7
8
9
10
11
@RestController
public class ProducerController {
@Autowired
private FanoutProducer fanoutProducer;

@RequestMapping("/sendFanout")
public String sendFanout(String msg) {
fanoutProducer.send(msg);
return "success";
}
}
消费消息

消费消息,远程调邮件服务发邮件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hu</groupId>
<artifactId>descout-buchang-consumer</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>

</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
rabbitmq:
host: 192.168.6.238
port: 5672
username: guest
password: guest
listener:
simple:
retry:
####开启消费者异常重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 2000
server:
port: 8081
1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
public class Application {
public static void main(String[] args){
SpringApplication.run(Application.class,args);
}

@Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}

获取发邮件消息,之后调用邮件服务器发邮件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class FanoutEamilConsumer {
@Autowired
RestTemplate restTemplate;
@RabbitListener(queues = "email_queue")
public void process(String msg) throws Exception {
System.out.println("邮件消费者获取生产者消息msg:" + msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String emailUrl = "http://192.168.6.238:8083/sendEmail?email=" + email;
JSONObject result = restTemplate.getForObject(emailUrl,JSONObject.class);
if (result == null) {
throw new Exception("调用接口失败!");
}
System.out.println("执行结束....");

}
}
邮件服务

下面模拟邮件服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hu</groupId>
<artifactId>descout-buchang-emailser</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
public class MsgController {

// 模拟发送邮件
@RequestMapping("/sendEmail")
public Map<String, Object> sendEmail(String email) {
System.out.println("开始发送邮件:" + email);
Map<String, Object> result = new HashMap<String, Object>();
result.put("code", "200");
result.put("msg", "发送邮件成功..");
System.out.println("发送邮件成功");
return result;
}
}

启动邮件服务和生产者服务,浏览器中输入:http://192.168.6.238:8080/sendFanout?msg=0001
可以看到消息队列中有消息:

20200423155603

启动消费者服务:

20200423155620
服务消息被消费,并且消费者服务日志已经获取消息,并且调用邮件服务来发送邮件,而且邮件服务也有了日志信息。

1
2
 INFO 14684 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#3b152928:0/SimpleConnection@61b9a318 [delegate=amqp://guest@192.168.6.238:5672/, localPort= 59727]
邮件消费者获取生产者消息msg:{"msg":"0001","Time":1564124703209,"email":"404914989@qq.com"}
1
2
开始发送邮件:404914989@qq.com
发送邮件成功

20200423155636

假如邮件服务器未启动,消费调用接口失败 会一直重试 重试五次,再次之间,如果邮件服务器启动成功 则重试成功 不再重试 不再进行补偿机制。
停止邮件服务器,http://192.168.6.238:8080/sendFanout?msg=456541再发送一次消息,可以看到消费者日志:

1
2
3
4
5
6
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
WARN 14684 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=email_queue, deliveryTag=4, consumerTag=amq.ctag-y3aGTKHoCsx2ydYRm4fflA, consumerQueue=email_queue])

服务器一直没有启动,重复消费了5次,之后不再补偿,并且有了异常信息。

网络延迟传输中,或者消费出现异常或者是消费延迟,会造成进行MQ重试进行重试补偿机制,在重试过程中,可能会造成重复消费。
如果不需要被重复消费: 使用全局MessageID判断消费,解决幂等性。

解决方式:
生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;

public void send(String msg) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "404914989@qq.com");
jsonObject.put("msg",msg);
jsonObject.put("Time", System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
System.out.println("jsonString:" + jsonString);
// 设置消息唯一id 保证每次重试消息id唯一
Message message = MessageBuilder.withBody(jsonString.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID() + "").build();
amqpTemplate.convertAndSend("email_queue", jsonString);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void process(Message message) throws Exception {
// 获取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("messageId:" + messageId + ",消息内容:" + msg);
if (messageId == null) {
return;
}
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
JSONObject result = restTemplate.getForObject(emailUrl,JSONObject.class);
if (result == null) {
throw new Exception("调用接口失败!");
}
System.out.println("执行结束....");
}

Spring boot 中还可以进行 AOP拦截 自动帮助做重试
如果不告诉服务器已经消费成功,则服务器不会删除 消息。告诉消费成功了才会删除。
消费者的yml加入:acknowledge-mode: manual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
rabbitmq:
host: 192.168.6.238
port: 5672
username: guest
password: guest
listener:
simple:
retry:
####开启消费者异常重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 2000
#开启手动ack
acknowledge-mode: manual
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
// 获取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("messageId:" + messageId + ",消息内容:" + msg);
if (messageId == null) {
return;
}
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String emailUrl = "http://192.168.6.238:8083/sendEmail?email=" + email;
JSONObject result = restTemplate.getForObject(emailUrl,JSONObject.class);
if (result == null) {
throw new Exception("调用接口失败!");
}

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//RabbitMQ消费成功了 消息可以删除
channel.basicAck(deliveryTag, false);
}

这样,消息只有正常消费后才会删除。由程序控制。

回调

生产者中加入配置:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
host: 192.168.6.238
port: 5672
username: guest
password: guest
#开启回调
publisher-confirms: true
publisher-returns: true

类修改,即可有回调信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Service
public class FanoutProducer implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate amqpTemplate;
@Autowired
public FanoutProducer(RabbitTemplate rabbitTemplate) {
this.amqpTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
public void send(String msg) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "404914989@qq.com");
jsonObject.put("msg",msg);
jsonObject.put("Time", System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
System.out.println("jsonString:" + jsonString);
// 设置消息唯一id 保证每次重试消息id唯一
String uuid = UUID.randomUUID().toString();
Message message = MessageBuilder.withBody(jsonString.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
.setMessageId(uuid).build();
CorrelationData correlationData = new CorrelationData(uuid);
amqpTemplate.convertAndSend("email_queue", message,correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if(correlationData!=null){
System.out.println("confirm: " + correlationData.getId() + ",s=" + s + ",b:" + b);
}
System.out.println("回调");
}
/*
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String orderId = correlationData.getId();
System.out.println("消息id:" + correlationData.getId());
if (ack) { //消息发送成功
System.out.println("消息发送确认成功");
} else {
//重试机制
send(orderId);
System.out.println("消息发送确认失败:" + cause);
}
}*/
}

或者在send方法中加入:

1
2
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);

demo见:https://github.com/huingsn/tech-point-record

快速搭建ELK日志分析系统

发表于 2019-03-22 | 分类于 Elasticsearch

https://blog.csdn.net/e_wsq/article/details/81303713

elasticsearch 查询优化建议(转)

发表于 2019-03-19 | 分类于 Elasticsearch

1:优化mapping 主要包括 doc_values , index , norms , type的keyword和text // 效果明显

doc_values属性 用于把数据序列化到磁盘,使索引结构更紧密

默认为true,binary类型为false

缺点:产生额外磁盘消耗

index 属性 用于是否对数据索引,对于一些没必要的数据不必要进行索引检索

默认都为true

缺点:对与无索引的对象无法使用条件过滤和查询

norms属性

不必要聚合的属性可以设置为false

keyword和text是String的拓展,5X之后使用这两个属性

keyword属性为对数据不建倒排类似以前的String: { "index": "not_analyzed"},text为对数据进行分词倒排

效果:能极大的减少磁盘空间

2 : 禁用 查询中的 _all , 设置合理的shard分片数 ,使用segment段落合并 // 效果明显

_all 默认打开,会把所有字段都在拷贝到这里,增加磁盘使用和影响查询性能

创建索引时添加 “_all”:{“enabled”:false}

shard

一个Shard就是一个Lucene实例,是一个完整的搜索引擎。

分片数过多会导致检索时打开比较多的文件,多台服务器之间通讯成本加大。

而分片数过少会导至单个分片索引过大,所以检索速度也会慢。

建议单个分片最多存储10G-20G左右的索引数据,每个实例的每个索引分片数建议在1-2个左右,并且尽量集群的所有节点

都分片数一致,不要出现分片数不一样导致的一个实例负载过大,等待合并的时间变长;

shard副本

使用副本的优点:数据备份,提高对大索引的查询效率,建议副本在1-2个左右,过多的副本会延迟合并时间以及磁盘使用率提高,性价比不高

segments

每个分片包含多个segment,每一个segment都是一个倒排索引;在查询的时,会把所有的segment查询结果汇总归并后最为最终的分片查询结果返回; segment越多,加载到内存中的segment越多,占用segment memory越多,查询性能可能就会下降,因此应该合并小的segment,减小segment数,提高检索的segment数来提高查询效率

创建索引的时候,elasticsearch会把文档信息写到内存bugffer中,elasticsearch定期会执行flush操作,把segment持久化到磁盘上,索引越大,segment越多,查询效率就会下降,由于segment创建是只是写入缓存,这时期容易系统异常容易丢失数据,可手动执行_flush来实现段落持久化

—- 合并索引段落语句

curl -XPOST ‘http://localhost:9200/{_index}/_forcemerge?max_num_segments=1'

—-并且每个es实例不要超过32G的jvm内存

———————以上策略 能提高不少的查询效率

3:避免内存交换 ,bootstrap.mlockall设置为true来实现 // 意义不大,或许是测试数据集1000w不够大,无法实现

在elasticsearch.yml里添加

bootstrap.memory_lock: true

1./etc/security/limits.conf ,不限制Es启动用户(如xxx)的memlock
            xxx soft memlock unlimited
            xxx hard memlock unlimited
 2.修改:/etc/sysctl.conf

            vm.swappiness=0   

  ---------延迟刷新时间或禁用刷新
           curl -XGET 'localhost:9200/novehicle-new/_settings' -d '{"refresh_interval": -1}'

4 : 优化query,查询的query返回必要字段,不用的字段,减小返回值大小

5:优化日志输出等级,把trance改成info // 好像效果不大

6: 路由优化 // 还在测试

 ES中所谓的路由和IP网络不同,是一个类似于Tag的东西。在创建文档的时候,可以通过字段为文档增加一个路由属性的Tag。ES内在机制决定了拥有相同路由属性的文档,一定会被分配到同一个分片上,无论是主分片还是副本。那么,在查询的过程中,一旦指定了感兴趣的路由属性,ES就可以直接到相应的分片所在的机器上进行搜索,而避免了复杂的分布式协同的一些工作,从而提升了ES的性能。于此同时,假设机器1上存有路由属性A的文档,机器2上存有路由属性为B的文档,那么我在查询的时候一旦指定目标路由属性为A,即使机器2故障瘫痪,对机器1构不成很大影响,所以这么做对灾况下的查询也提出了解决方案。所谓的路由,本质上是一个分桶(Bucketing)操作。当然,查询中也可以指定多个路由属性,机制大同小异。

7: 添加查询缓存,和预处理查询 // 测试中

8: 利用磁盘缓存提高检索

磁盘检索速度过慢,对于实时性较高的场景无法运用磁盘检索 ;所以索引处理中,需要把索引文件刷新加载到缓存中 , Elasticsearch默认1s的时间间隔,这也就是说相当于是实时搜索的,Elasticsearch也提供了单独的/_reflush接口,用户如果对1s间隔还是不太满意,可以主动调用接口来保证搜索可见。

— 刷新所有索引
POST /_refresh
— 指定索引刷新
POST /{_index}/_refresh

一般来说我们会通过/_settings接口或者定制template的方式,加大refresh_interval参数:

— 禁用自动refresh
PUT /{_index}/_settings{ “refresh_interval”: -1 }
— 设置每秒刷新
PUT /{_index}/_settings{ “refresh_interval”: “1s” }

9:控制translog

既然refresh只是写到文件系统缓存中,那么最后一步写到实际磁盘又是由什么来控制的呢?如果这期间发生主机错误、硬盘故障等异常情况,数据会不会丢失?这里,其实Elasticsearch提供了另一个机制来控制。Elasticsearch也把数据写入到内存buffer的同时,其实还另外记录了一个treanslog的日志。也就是说,在内存数据进入到buffer这一步骤时,其实还另外记录了一个translog记录。

10:动态关闭不必要的索引 // 好像没太大意义

索引默认处于open状态,处于open状态的索引都会占用内存,对于不必要的索引可以close

curl -XPOST ‘http://localhost:9200/{_index}/_close'

11 : 删除索引的注意点 // 测试中

当一个文档被更新,旧版本的文档被标记为删除,新版本的文档在新的段中索引。也许该文档的不同版本都会匹配一个查询,但是老版本会从结果中删除,

还是参与查询,影响检索效率;

删除文档在es时时不会马上删除,而是先生成.del文件,es在检索时会先判断文件是否删除再过滤,这样会降低检索效率,可手动执行删除文档

curl -XPOST ‘http://localhost:9200/{_index}/_forcemerge?only_expunge_deletes=true'

12 : 当要导入大量数据时,设置副本为0,之后动态添加副本 // 效率较大

当导入大量索引时,设置了副本数,es会同时打开副本同步,消耗系统资源,同时需要额外提供主副之间的通信

新建索引是可设置副本为0

—-设置副本数

curl -XPOST ‘http://localhost:9200/{_index}/_settings' -d

‘{“index”:{“number_of_replicas”:1}}’

13:给文件系统缓存大内存 至少给可用内存的一半到文件系统缓存。

14: 避免链接,嵌套会使查询慢几倍,而亲自关系能使查询慢几百倍,所以如果同样的问题可以通过没有链接的非规范回答就可以提升速度。

15:使用最小的足够用的数值类型

byte,short,integer,long

half_float,float,double

16: 索引缓冲大小

indices.memory.index_buffer_size通常是JVM的0.1,确保他足够处理至多512MB的索引。

17:优化es的线程池

threadpool.index.type: fixed

threadpool.index.size: 100

threadpool.index.queue_size: 500

18:采用G1垃圾回收机制代替默认CMS

JAVA_OPTS=”$JAVA_OPTS -XX:+UseG1GC”

JAVA_OPTS=”$JAVA_OPTS -XX:MaxGCPauseMillis=200”

19: 清理掉没用的缓存

缓存类型设置为Soft Reference,只有当内存不够时才会进行回收

index.cache.field.max_size: 50000

index.cache.field.expire: 10m

index.cache.field.type: soft

原文链接:https://blog.csdn.net/ailice001/article/details/79664455

elasticsearch 常用查询语句

发表于 2019-03-19 | 分类于 Elasticsearch

整理常用的es查询语句: 基于kibana的Dev Tools控制板

索引相关查询

//查询所有索引及容量

GET _cat/indices

//查询索引映射结构
GET my_index/_mapping

// 查询所有索引映射结构

GET _all

// 查询所有的相同前缀索引

GET my-*/_search

// 查询所有索引模板

GET _template

// 查询具体索引模板

GET _template/my_template

集群相关

//查询集群健康状态

GET _cluster/health

// 查询所有节点

GET _cat/nodes

// 查询索引及分片的分布
GET _cat/shards

// 查询所有插件

GET _cat/plugins

写入模块

写入索引模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
PUT _template/my_template
{
"template" : "my-*",
"order" : 0,
"settings" : {
"number_of_shards" : 10,
"number_of_replicas" : 0
},
"mappings": {

"default": {

"_all": {
"enabled": false
},
"properties": {
"name": {
"type": "text"
},
"age": {
"type": "long"
}
}
}
}
}

创建索引映射结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PUT my_index
{
"mappings": {
"doc": {
"properties": {
"name": {
"type": "text"
},
"blob": {
"type": "binary"
}
}
}
}
}

写入索引

1
2
3
4
5
PUT my_index/doc/1
{
"name": "Some binary blob",
"blob": "U29tZSBiaW5hcnkgYmxvYg=="
}

删除

1
2
3
4
5
6
7
// 索引

DELETE my-index

// 模板

DELETE _template/my_template

DSL query查询

使用本地插件查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{

"size": 10,

"from": 0,

"query": {
"function_score": {
"script_score": {
"script": {
"inline": "featurescore",
"lang": "native",
"params": {
"name": "you",
"age": "20"
}
}
},
"query": {
"bool": {
"filter": {
"term": {
"name": "you"
}
}
}
}
}
},
"_source": {
"includes": ["name", "age"]
},
"sort": {
"_score": {
"order": "asc"
}
}
}

// 说明 inline 指定插件名 lang指定插件形式 native是本地插件 param定义参数 插件里使用XContentMapValues.nodeStringValue(params.get("name"), null)获取 , elasticseach里存储的字段值使用 source().get("name") 来获取,插件会并行处理es中每一条数据 ;

_source 指定返回字段 , sort 指定插件处理结果的排序字段

基础query

1
2
3
4
5
6
7
//查询所有
GET _search
{
"query": {
"match_all": {}
}
}

// 查询单个索引 的 固定属性

精确匹配

1
2
3
4
5
6
GET _search
{
"query": {
"term": { "name" : "you" }
}
}

模糊匹配

1
2
3
4
5
6
GET _search
{
"query": {
"match": { "name" : "you" }
}
}

范围查找

1
2
3
4
5
6
7
8
9
GET _search
{
"query": {
"range": {
"age":{ "gte" : 15 , "lte" : 25 }
}
}
}
// 功能性查询

过滤

1
2
3
4
5
6
7
8
9
10
GET my_index/_search
{
"query": {
"bool": {
"filter": {
"term":{"age":1095}
}
}
}
}

或 or

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET my - test / _search {
"query": {
"bool": {
"should": [{
"term": {
"name": "you"
}
}, {
"match": {
"age": 20
}
}]
}
}
}

与 AND

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
GET my-test/_search
{
"query": {
"bool": {
"must" : [{
"match" : {
"name" : "you"
}
},{
"range":{
"age":{
"from" : 10 , "to" : 20
}
}
}]
}
}
}

必须 =

1
2
3
4
5
6
7
8
9
10
11
12
GET my_index/_search
{
"query": {
"bool": {
"must" : {
"range" : {
"age" : { "from" : 10, "to" : 20 }
}
}
}
}
}

必须不 not

1
2
3
4
5
6
7
8
9
10
11
12
GET my_index/_search
{
"query": {
"bool": {
"must_not" : {
"term" : {
"name" : "you"
}
}
}
}
}

复合查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
GET my_index/_search 
{
"query": {
"bool": {
"should": [{
"match": {
"age": 40
}
},
{
"match": {
"age": 20
}
}],
"filter": {
"match":{
"name":"you"
}
}
}
}
}

索引迁移

1
2
3
4
5
6
7
8
9
10
---场景 从A索引 复制到B索引
POST _reindex
{
"source": {
"index": "my_index"
},
"dest": {
"index": "new_my_index"
}
}

基于查询的删除

1
2
3
4
5
6
7
8
9
POST test-index/_delete_by_query
{
"query":{
"term": {
"cameraId":"00000000002"
}
}

}

查询

1
2
3
4
5
6
7
8
GET test-index/_search
{
"query":{
"term": {
"cameraId":"00000000002"
}
}
}

ES的因果介绍(转)

发表于 2019-03-19 | 分类于 Elasticsearch

1、大规模数据如何检索

1)用什么数据库好?(MySQL、Oracle、达梦、神通、MongoDB、Hbase…)
2)如何解决单点故障;(lvs、F5、A10、Zookeep、MQ)
3)如何保证数据安全性;(热备、冷备、异地多活)
4)如何解决检索难题;(数据库代理中间件:mysql-proxy、Cobar、MaxScale等;)
5)如何解决统计分析问题;(离线、近实时)

2、传统数据库的应对解决方案

对于关系型数据,解决要点:
1)通过主从备份解决数据安全性问题;
2)通过数据库代理中间件心跳监测,解决单点故障问题;
3)通过代理中间件将查询语句分发到各个slave节点进行查询,并汇总结果
非关系型数据库的解决方案,要点:
1)通过副本备份保证数据安全性;
2)通过节点竞选机制解决单点问题;
3)先从配置库检索分片信息,然后将请求分发到各个节点,最后由路由节点合并汇总结果。

假如把数据完全放入内存,成本超级高。所以数据放在内存也好,不放在内存也好,都不能完完全全解决问题。
为解决以上问题,从源头着手分析,通常会从以下方式来寻找方法:
1、存储数据时按有序存储;
2、将数据和索引分离;
3、压缩数据;
这就引出了Elasticsearch。

3、ES描述

ES=elaticsearch简写, Elasticsearch是一个开源的高扩展的分布式全文检索引擎,它可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。
Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。

Lucene与ES关系:
1)Lucene只是一个库。想要使用它,你必须使用Java来作为开发语言并将其直接集成到你的应用中,更糟糕的是,Lucene非常复杂,你需要深入了解检索的相关知识来理解它是如何工作的。
2)Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。

ES主要解决问题:
1)检索相关数据;
2)返回统计结果;
3)速度要快。

ES工作原理
当ElasticSearch的节点启动后,它会利用多播(multicast)(或者单播,如果用户更改了配置)寻找集群中的其它节点,并与之建立连接。

ES核心概念
1)Cluster:集群。
ES可以作为一个独立的单个搜索服务器。不过,为了处理大型数据集,实现容错和高可用性,ES可以运行在许多互相合作的服务器上。这些服务器的集合称为集群。

2)Node:节点。
形成集群的每个服务器称为节点。

3)Shard:分片。
当有大量的文档时,由于内存的限制、磁盘处理能力不足、无法足够快的响应客户端的请求等,一个节点可能不够。这种情况下,数据可以分为较小的分片。每个分片放到不同的服务器上。
当你查询的索引分布在多个分片上时,ES会把查询发送给每个相关的分片,并将结果组合在一起,而应用程序并不知道分片的存在。即:这个过程对用户来说是透明的。

4)Replia:副本。
为提高查询吞吐量或实现高可用性,可以使用分片副本。
副本是一个分片的精确复制,每个分片可以有零个或多个副本。ES中可以有许多相同的分片,其中之一被选择更改索引操作,这种特殊的分片称为主分片。
当主分片丢失时,如:该分片所在的数据不可用时,集群将副本提升为新的主分片。

5)全文检索。
全文检索就是对一篇文章进行索引,可以根据关键字搜索,类似于mysql里的like语句。
全文索引就是把内容根据词的意义进行分词,然后分别创建索引,例如”你们的激情是因为什么事情来的” 可能会被分词成:“你们“,”激情“,“什么事情“,”来“ 等token,这样当你搜索“你们” 或者 “激情” 都会把这句搜出来。

ELK是什么?
ELK=elasticsearch+Logstash+kibana
elasticsearch:后台分布式存储以及全文检索
logstash: 日志加工、“搬运工”
kibana:数据可视化展示。
ELK架构为数据分布式存储、可视化查询和日志解析创建了一个功能强大的管理链。 三者相互配合,取长补短,共同完成分布式大数据处理工作。

ES特点和优势
1)分布式实时文件存储,可将每一个字段存入索引,使其可以被检索到。
2)实时分析的分布式搜索引擎。
分布式:索引分拆成多个分片,每个分片可有零个或多个副本。集群中的每个数据节点都可承载一个或多个分片,并且协调和处理各种操作;
负载再平衡和路由在大多数情况下自动完成。
3)可以扩展到上百台服务器,处理PB级别的结构化或非结构化数据。也可以运行在单台PC上(已测试)
4)支持插件机制,分词插件、同步插件、Hadoop插件、可视化插件等。

4、ES性能

性能结果展示
(1)硬件配置:
CPU 16核 AuthenticAMD
内存 总量:32GB
硬盘 总量:500GB 非SSD

在上述硬件指标的基础上测试性能如下:
1)平均索引吞吐量: 12307docs/s(每个文档大小:40B/docs)
2)平均CPU使用率: 887.7%(16核,平均每核:55.48%)
3)构建索引大小: 3.30111 GB
4)总写入量: 20.2123 GB
5)测试总耗时: 28m 54s.

性能esrally工具(推荐)
使用参考:http://blog.csdn.net/laoyang360/article/details/52155481

为什么要用ES?
1) 2013年初,GitHub抛弃了Solr,采取ElasticSearch 来做PB级的搜索。 “GitHub使用ElasticSearch搜索20TB的数据,包括13亿文件和1300亿行代码”。
2)维基百科:启动以elasticsearch为基础的核心搜索架构。
3)SoundCloud:“SoundCloud使用ElasticSearch为1.8亿用户提供即时而精准的音乐搜索服务”。
4)百度:百度目前广泛使用ElasticSearch作为文本数据分析,采集百度所有服务器上的各类指标数据及用户自定义数据,通过对各种数据进行多维分析展示,辅助定位分析实例异常或业务层面异常。目前覆盖百度内部20多个业务线(包括casio、云分析、网盟、预测、文库、直达号、钱包、风控等),单集群最大100台机器,200个ES节点,每天导入30TB+数据。

我们也需要
实际项目开发实战中,几乎每个系统都会有一个搜索的功能,当搜索做到一定程度时,维护和扩展起来难度就会慢慢变大,所以很多公司都会把搜索单独独立出一个模块,用ElasticSearch等来实现。

近年ElasticSearch发展迅猛,已经超越了其最初的纯搜索引擎的角色,现在已经增加了数据聚合分析(aggregation)和可视化的特性,如果你有数百万的文档需要通过关键词进行定位时,ElasticSearch肯定是最佳选择。当然,如果你的文档是JSON的,你也可以把ElasticSearch当作一种“NoSQL数据库”, 应用ElasticSearch数据聚合分析(aggregation)的特性,针对数据进行多维度的分析。

【知乎:热酷架构师潘飞】ES在某些场景下替代传统DB
个人以为Elasticsearch作为内部存储来说还是不错的,效率也基本能够满足,在某些方面替代传统DB也是可以的,前提是你的业务不对操作的事性务有特殊要求;而权限管理也不用那么细,因为ES的权限这块还不完善。
由于我们对ES的应用场景仅仅是在于对某段时间内的数据聚合操作,没有大量的单文档请求(比如通过userid来找到一个用户的文档,类似于NoSQL的应用场景),所以能否替代NoSQL还需要各位自己的测试。
如果让我选择的话,我会尝试使用ES来替代传统的NoSQL,因为它的横向扩展机制太方便了。

5、ES的应用场景

通常我们面临问题有两个:
1)新系统开发尝试使用ES作为存储和检索服务器;
2)现有系统升级需要支持全文检索服务,需要使用ES。
以上两种架构的使用,以下链接进行详细阐述。
http://blog.csdn.net/laoyang360/article/details/52227541

一线公司ES使用场景:
1)新浪ES 如何分析处理32亿条实时日志 http://dockone.io/article/505
2)阿里ES 构建挖财自己的日志采集和分析体系 http://afoo.me/columns/tec/logging-platform-spec.html
3)有赞ES 业务日志处理 http://tech.youzan.com/you-zan-tong-ri-zhi-ping-tai-chu-tan/
4)ES实现站内搜索 http://www.wtoutiao.com/p/13bkqiZ.html

6、如何部署ES

6.1 ES部署(无需安装)
1)零配置,开箱即用
2)没有繁琐的安装配置
3)java版本要求:最低1.7
我使用的1.8
[root@lyng config_lhy]# echo $JAVA_HOME
/opt/jdk1.8.0_91
4)下载地址:
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.3.5/elasticsearch-2.3.5.zip
5)启动
cd /usr/local/elasticsearch-2.3.5
./bin/elasticsearch
bin/elasticsearch -d(后台运行)

6.2 ES必要的插件
必要的Head、kibana、IK(中文分词)、graph等插件的详细安装和使用。
http://blog.csdn.net/column/details/deep-elasticsearch.html

6.3 ES windows下一键安装
自写bat脚本实现windows下一键安装。
1)一键安装ES及必要插件(head、kibana、IK、logstash等)
2)安装后以服务形式运行ES。
3)比自己摸索安装节省至少2小时时间,效率非常高。
脚本说明:
http://blog.csdn.net/laoyang360/article/details/51900235

7、ES对外接口

1)JAVA API接口
http://www.ibm.com/developerworks/library/j-use-elasticsearch-java-apps/index.html

2)RESTful API接口
常见的增、删、改、查操作实现:
http://blog.csdn.net/laoyang360/article/details/51931981

ES遇到问题怎么办:
1)国外:https://discuss.elastic.co/
2)国内:http://elasticsearch.cn/

参考:
[1] http://www.tuicool.com/articles/7fueUbb
[2] http://zhaoyanblog.com/archives/495.html
[3]《Elasticsearch服务器开发》
[4]《实战Elasticsearch、Logstash、Kibana》
[5]《Elasticsearch In Action》

Elasticsearch集群管理

发表于 2019-03-18 | 分类于 Elasticsearch

集群规划

搭建一个集群我们需要考虑如下几个问题:

1
2
3
4
5
1. 我们需要多大规模的集群?
2. 集群中的节点角色如何分配?
3. 如何避免脑裂问题?
4. 索引应该设置多少个分片?
5. 分片应该设置几个副本?

我们需要多大规模的集群

需要从以下两个方面考虑:

当前的数据量有多大?数据增长情况如何?

你的机器配置如何?cpu、多大内存、多大硬盘容量?

推算的依据:

1
2
ES JVM heap 最大可以设置32G 。
30G heap 大概能处理的数据量 10 T。如果内存很大如128G,可在一台机器上运行多个ES节点实例。

备注:集群规划满足当前数据规模+适量增长规模即可,后续可按需扩展。

两类应用场景:

1
2
A. 用于构建业务搜索功能模块,且多是垂直领域的搜索。数据量级几千万到数十亿级别。一般2-4台机器的规模。
B. 用于大规模数据的实时OLAP(联机处理分析),经典的如ELK Stack,数据规模可能达到千亿或更多。几十到上百节点的规模。

集群中的节点角色如何分配

** 节点角色 **

1
2
3
4
5
6
Master
node.master: true 节点可以作为主节点
DataNode
node.data: true 默认是数据节点。
Coordinate node 协调节点
如果仅担任协调节点,将上两个配置设为false。

说明:

一个节点可以充当一个或多个角色,默认三个角色都有

协调节点:一个节点只作为接收请求、转发请求到其他节点、汇总各个节点返回数据等功能的节点。

** 如何分配 **

1
2
A. 小规模集群,不需严格区分。
B. 中大规模集群(十个以上节点),应考虑单独的角色充当。特别并发查询量大,查询的合并量大,可以增加独立的协调节点。角色分开的好处是分工分开,不互影响。如不会因协调角色负载过高而影响数据节点的能力。

如何避免脑裂问题

脑裂问题:一个集群中只有一个A主节点,A主节点因为需要处理的东西太多或者网络过于繁忙,从而导致其他从节点ping不通A主节点,这样其他从节点就会认为A主节点不可用了,就会重新选出一个新的主节点B。过了一会A主节点恢复正常了,这样就出现了两个主节点,导致一部分数据来源于A主节点,另外一部分数据来源于B主节点,出现数据不一致问题,这就是脑裂。

尽量避免脑裂,需要添加最小数量的主节点配置:discovery.zen.minimum_master_nodes: (有master资格节点数/2) + 1

这个参数控制的是,选举主节点时需要看到最少多少个具有master资格的活节点,才能进行选举。官方的推荐值是(N/2)+1,其中N是具有master资格的节点的数量。

** 常用做法 **(中大规模集群):

    1. Master 和 dataNode 角色分开,配置奇数个master
    1. 单播发现机制,配置master资格节点:
      1
      2
      discovery.zen.ping.multicast.enabled: false —— 关闭多播发现机制,默认是关闭的
      discovery.zen.ping.unicast.hosts: ["master1", "master2", "master3"] —— 配置单播发现的主节点ip地址,其他从节点要加入进来,就得去询问单播发现机制里面配置的主节点我要加入到集群里面了,主节点同意以后才能加入,然后主节点再通知集群中的其他节点有新节点加入
    1. 配置选举发现数,及延长ping master的等待时长
      1
      2
      discovery.zen.ping_timeout: 30(默认值是3秒)——其他节点ping主节点多久时间没有响应就认为主节点不可用了
      discovery.zen.minimum_master_nodes: 2 —— 选举主节点时需要看到最少多少个具有master资格的活节点,才能进行选举

索引应该设置多少个分片

说明:分片数指定后不可变,除非重索引。

思考:

分片对应的存储实体是什么?存储的实体是索引

分片是不是越多越好?  不是

分片多有什么影响?  分片多浪费存储空间、占用资源、影响性能

  • 分片过多的影响
    1
    2
    3
    每个分片本质上就是一个Lucene索引, 因此会消耗相应的文件句柄, 内存和CPU资源。
    每个搜索请求会调度到索引的每个分片中. 如果分片分散在不同的节点倒是问题不太. 但当分片开始竞争相同的硬件资源时, 性能便会逐步下降。
    ES使用词频统计来计算相关性. 当然这些统计也会分配到各个分片上. 如果在大量分片上只维护了很少的数据, 则将导致最终的文档相关性较差。
  • 分片设置的可参考原则
    1
    2
    3
    ElasticSearch推荐的最大JVM堆空间是30~32G, 所以把你的分片最大容量限制为30GB, 然后再对分片数量做合理估算. 例如, 你认为你的数据能达到200GB, 推荐你最多分配7到8个分片。
    在开始阶段, 一个好的方案是根据你的节点数量按照1.5~3倍的原则来创建分片. 例如,如果你有3个节点, 则推荐你创建的分片数最多不超过9(3x3)个。当性能下降时,增加节点,ES会平衡分片的放置。
    对于基于日期的索引需求, 并且对索引数据的搜索场景非常少. 也许这些索引量将达到成百上千, 但每个索引的数据量只有1GB甚至更小. 对于这种类似场景, 建议只需要为索引分配1个分片。如日志管理就是一个日期的索引需求,日期索引会很多,但每个索引存放的日志数据量就很少。

分片应该设置几个副本

说明:副本数是可以随时调整的!

思考:

副本的用途是什么?备份数据保证高可用数据不丢失,高并发的时候参与数据查询

针对它的用途,我们该如何设置它的副本数? 一般一个分片有1-2个副本即可保证高可用

集群规模没变的情况下副本过多会有什么影响?  副本多浪费存储空间、占用资源、影响性能

** 副本设置基本原则**

为保证高可用,副本数设置为2即可。要求集群至少要有3个节点,来分开存放主分片、副本。如发现并发量大时,查询性能会下降,可增加副本数,来提升并发查询能力。

注意:新增副本时主节点会自动协调,然后拷贝数据到新增的副本节点

集群搭建

利用docker容器搭建伪集群

在Linux中创建目录,并且配置好:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|-- node1
| |-- config
| | `-- es1.yml
| |-- data
| `-- plugins
|-- node2
| |-- config
| | `-- es1.yml
| |-- data
| `-- plugins
`-- node3
|-- config
| `-- es1.yml
|-- data
`-- plugins

目录文件: 是用来挂载用的,同步配置文件。容器的和外部的同步。

开启防火墙(9300、9301、9302):firewall-cmd –add-port=9301/tcp

分别配置好配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
cluster.name: elasticsearch-cluster
node.name: es-node3
network.bind_host: 0.0.0.0
network.publish_host: 172.17.0.13
http.port: 9202
transport.tcp.port: 9302
http.cors.enabled: true
http.cors.allow-origin: "*"
node.master: true
node.data: true
discovery.zen.ping.unicast.hosts: ["172.17.0.11:9300","172.17.0.12:9301","172.17.0.13:9302"]
discovery.zen.minimum_master_nodes: 2

discovery.zen.minimum_master_nodes: 1 master节点有1个 # discovery.zen.minimum_master_nodes—节点总数/2+1

说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
cluster.name:用于唯一标识一个集群,默认值是:elasticsearch。
node.name:节点名,默认随机指定一个name列表中名字。集群中node名字不能重复

index.number_of_shards: 默认的配置是把索引分为5个分片
index.number_of_replicas:设置每个index的默认的冗余备份的分片数,默认是1

通过 index.number_of_shards,index.number_of_replicas默认设置索引将分为5个分片,每个分片1个副本,共10个结点。

bootstrap.memory_lock: true 当JVM做分页切换(swapping)时,ElasticSearch执行的效率会降低,推荐把ES_MIN_MEM和ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的物理内存分配给ES,同时允许ElasticSearch进程锁住内存

network.bind_host: 设置可以访问的ip,可以是ipv4或ipv6的,默认为0.0.0.0,这里全部设置通过

network.publish_host:设置其它结点和该结点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址

同时设置bind_host和publish_host两个参数可以替换成network.host
network.bind_host: 172.17.0.11
network.publish_host: 172.17.0.11
=>network.host: 172.17.0.11

http.port:设置对外服务的http端口,默认为9200

transport.tcp.port: 设置节点之间交互的tcp端口,默认是9300

http.cors.enabled: 是否允许跨域REST请求

http.cors.allow-origin: 允许 REST 请求来自何处

node.master: true 配置该结点有资格被选举为主结点(候选主结点),用于处理请求和管理集群,false为不具有选举的资格。

node.data: true 配置该结点是数据结点,用于保存数据,执行数据相关的操作(CRUD,Aggregation);

discovery.zen.minimum_master_nodes:
自动发现master节点的最小数,如果这个集群中配置进来的master节点少于这个数目,es的日志会一直报master节点数目不足,默认为1。
为了避免脑裂,个数请遵从该公式 => (totalnumber of master-eligible nodes / 2 + 1)。
脑裂是指在主备切换时,由于切换不彻底或其他原因,导致客户端和Slave误以为出现两个active master,最终使得整个集群处于混乱状态。

discovery.zen.ping.unicast.hosts: 集群个节点IP地址,也可以使用es-node等名称,需要各节点能够解析

分别启动:

1
2
3
4
5
docker run -d -p 9200:9200 -p 9300:9300 -v /root/es-cluster/node1/config/es1.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /root/es-cluster/node1/plugins:/usr/share/elasticsearch/plugins -v /root/es-cluster/node1/data:/usr/share/elasticsearch/data -e ES_JAVA_OPTS="-Xms512m -Xmx512m" --name es-node1 bdaab402b220

或:

docker run -d -e ES_JAVA_OPTS="-Xms512m -Xmx512m" -p 9200:9200 -p 9300:9300 --mount type=bind,source=/root/es-cluster/node1/config/es1.yml,target=/usr/share/elasticsearch/config/elasticsearch.yml --mount type=bind,source=/root/es-cluster/node1/plugins,target=/usr/share/elasticsearch/plugins --mount type=bind,source=/root/es-cluster/node1/data,target=/usr/share/elasticsearch/data --name es-node1 bdaab402b220

docker run -d -e ES_JAVA_OPTS=”-Xms512m -Xmx512m” -p 9201:9201 -p 9301:9301 -v /root/es-cluster/node2/config/es1.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /root/es-cluster/node2/plugins:/usr/share/elasticsearch/plugins -v /root/es-cluster/node2/data:/usr/share/elasticsearch/data –name es-node2 bdaab402b220

docker run -d -e ES_JAVA_OPTS=”-Xms512m -Xmx512m” -p 9202:9202 -p 9302:9302 -v /root/es-cluster/node3/config/es1.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /root/es-cluster/node3/plugins:/usr/share/elasticsearch/plugins -v /root/es-cluster/node3/data:/usr/share/elasticsearch/data –name es-node3 bdaab402b220

出现的问题:

1、Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error=’Cannot allocate memory’ (errno=12)

解决办法是,运行的时间添加设定每个容器的 ES_JAVA_OPTS=”-Xms256m -Xmx256m”

2、bootstrap checks failed max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]

** 调高JVM线程数限制数量 **

1
2
3
4
5
6
7
在centos窗口中,修改配置sysctl.conf
vim /etc/sysctl.conf

加入如下内容:
vm.max_map_count=262144

启用配置:sysctl -p

3、OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

增加data权限 ,如果有can not run elasticsearch as root则需要使用切换普通用户操作。;

4、docker WARNING: IPv4 forwarding is disabled

1
2
3
4
在宿主机上面执行:
net.ipv4.ip_forward=1 >> /usr/lib/sysctl.d/00-system.conf
重启network和docker服务
systemctl restart network && systemctl restart docker

5、Caused by: java.io.IOException: No route to host

data文件中的节点数据,需要把示例二data文件下的文件清空。rm -rf ./node/data/

正式集群

  1. 准备3台虚拟机:

  2. 168.152.128 、192.168.152.129、192.168.152.130

  3. 在3台虚拟机里面都安装好elasticsearch
    安装教程参考我之前写的文章的ES的安装和配置部分:

https://www.cnblogs.com/leeSmall/p/9189078.html

  1. 修改3台虚拟机下ES的配置,使得它们组成一个集群
    进入elasticsearch的config目录,修改elasticsearch.yml的配置

3.1. IP访问限制、默认端口修改9200
这里有两个需要提醒下,第一个就是IP访问限制,第二个就是es实例的默认端口号9200。IP访问限制可以限定具体的IP访问服务器,这有一定的安全过滤作用。

#Set the bind address to a specific IP (IPv4 or IPv6):

network.host: 192.168.152.128
如果设置成0.0.0.0则是不限制任何IP访问。一般在生产的服务器可能会限定几台IP,通常用于管理使用。

默认的端口9200在一般情况下也有点风险,可以将默认的端口修改成另外一个,这还有一个原因就是怕开发人员误操作,连接上集群。当然,如果你的公司网络隔离做的很好也无所谓。

#Set a custom port for HTTP:

http.port: 9200
transport.tcp.port: 9300
这里的9300是集群内部通讯使用的端口,这个也可以修改掉。因为连接集群的方式有两种,通过扮演集群node也是可以进入集群的,所以还是安全起见,修改掉默认的端口。

说明:记得修改安装了ES的3台虚拟机(三个节点)的相同配置,要不然节点之间无法建立连接工作,也会报错。

3.2 集群发现IP列表、node、cluster名称
紧接着修改集群节点IP地址,这样可以让集群在规定的几个节点之间工作。elasticsearch,默认是使用自动发现IP机制。就是在当前网段内,只要能被自动感知到的IP就能自动加入到集群中。这有好处也有坏处。好处就是自动化了,当你的es集群需要云化的时候就会非常方便。但是也会带来一些不稳定的情况,如,master的选举问题、数据复制问题。

导致master选举的因素之一就是集群有节点进入。当数据复制发生的时候也会影响集群,因为要做数据平衡复制和冗余。这里面可以独立master集群,剔除master集群的数据节点能力。

固定列表的IP发现有两种配置方式,一种是互相依赖发现,一种是全量发现。各有优势吧,我是使用的依赖发现来做的。这有个很重要的参考标准,就是你的集群扩展速度有多快。因为这有个问题就是,当全量发现的时候,如果是初始化集群会有很大的问题,就是master全局会很长,然后节点之间的启动速度各不一样。所以我采用了靠谱点的依赖发现。

你需要在192.168.152.128的elasticsearch中配置成:

#——————————— Discovery ———————————-

#Pass an initial list of hosts to perform discovery when new node is started:
#The default list of hosts is [“127.0.0.1”, “[::1]”]

discovery.zen.ping.unicast.hosts: [ “192.168.152.129:9300”,”192.168.152.130:9300” ]
让他去发现129,130的机器,以此内推,完成剩下的129和130机器的配置。

然后你需要配置下集群名称,就是你当前节点所在集群的名称,这有助于你规划你的集群。集群中的所有节点的集群名称必须一样,只有集群名称一样才能组成一个逻辑集群。

#———————————- Cluster ———————————–

#Use a descriptive name for your cluster:

cluster.name: mycluster
配置你当前节点的名称

#———————————— Node ————————————

#Use a descriptive name for the node:

node.name: node-1
以此类推,完成另外两个节点的配置。cluster.name的名称必须保持一样。然后分别设置node.name。

说明:

这里搭建的是一个简单的集群,没有做集群节点角色的区分,所以3个节点默认的角色有主节点、数据节点、协调节点

选举ES主节点的逻辑:

选举的大概逻辑,它会根据分片的数据的前后新鲜程度来作为选举的一个重要逻辑。(日志、数据、时间都会作为集群master全局的重要指标)

因为考虑到数据一致性问题,当然是用最新的数据节点作为master,然后进行新数据的复制和刷新其他node。

2、[WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [] uncaught exception in thread [main] org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root

新的版本安全级别 提高了,不允许采用root启动,我们需要添加一个新的用户。

1
2
3
4
5
6
7
1、创建elasticsearch用户组:[root@localhost /]# groupadd elasticsearch
2、创建elasticsearch用户:[root@localhost elasticsearch-6.1.1]# useradd elasticsearch -g elasticsearch -p elasticsearch
3、更改 elasticsearch-6.1.1文件夹下所有文件的所属用户和组分别为elasticsearch、elasticsearch。命令: chown -R elasticsearch.elasticsearch *(chown将指定文件的拥有者,-R代表处理指定目录以及子目录下的所有文件)
4、给elasticsearch用户分配读写“/usr/local/fast/elasticsearch-6.1.1/data”的权限 命令:chown -R elasticsearch:elasticsearch elasticsearch-6.1.1
[1]: max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536],修改/etc/security/limits.conf,添加如下内容,如果不成功的话可以尝试ulimit -n 65536。
[2]: max number of threads [794] for user [elasticsearch] is too low, increase to at least [4096],修改/etc/security/limits.d/20-nproc.conf和/etc/security/limits.conf。
[3]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144],修改/etc/sysctl.conf文件,具体如下。修改完成后,输入sysctl -a 命令。

集群管理

  1. 监控API
    http://localhost:9200/_cat

复制代码
GET /_cat

/_cat/health
/_cat/nodes
/_cat/master
/_cat/indices
/_cat/allocation
/_cat/shards
/_cat/shards/{index}
/_cat/thread_pool
/_cat/segments
/_cat/segments/{index}
复制代码
2. x-pack
为集群提供安全防护、监控、告警、报告等功能的收费组件;
部分免费:https://www.elastic.co/subscriptions
6.3开始已开源,并并入了elasticsearch核心中。

官网安装介绍:

https://www.elastic.co/guide/en/elasticsearch/reference/6.2/installing-xpack-es.html

参考文章:

集群搭建一:https://www.cnblogs.com/wangiqngpei557/p/5967377.html

集群搭建二:https://www.cnblogs.com/jstarseven/p/6803054.html 这一篇文章写得比较详细,同时还总结了搭建过程中遇到的问题,搭建ES集群的话强烈推荐

配置参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
cluster.name: ES	ES集群名称,同一个集群内的所有节点集群名称必须保持一致
node.name: slave2 ES集群内的节点名称,同一个集群内的节点名称要具备唯一性
node.master: true 允许节点是否可以成为一个master节点,ES是默认集群中的第一台机器成为master,如果这台机器停止就会重新选举
node.data: false 允许该节点存储索引数据(默认开启)
path.data: ES是搜索引擎,会创建文档,建立索引,此路径是索引的存放目录.可以指定多个存储位置
path.logs: elasticsearch专门的日志存储位置
bootstrap.memory_lock: true 在ES运行起来后锁定ES所能使用的堆内存大小,锁定内存大小一般为可用内存的一半左右;锁定内存后就不会使用交换分区。如果不打开此项,当系统物理内存空间不足,ES将使用交换分区,ES如果使用交换分区,那么ES的性能将会变得很差
network.host: 0.0.0.0 es的HTTP端口和集群通信端口就会监听在此地址上
network.tcp.no_delay: true 是否启用tcp无延迟,true为启用tcp不延迟,默认为false启用tcp延迟
truenetwork.tcp.keep_alive: true 是否启用TCP保持活动状态,默认为true
network.tcp.reuse_address: true 是否应该重复使用地址。默认true,在Windows机器上默认为false
network.tcp.send_buffer_size: 128mb tcp发送缓冲区大小,默认不设置
network.tcp.receive_buffer_size: 128mb tcp接收缓冲区大小,默认不设置
transport.tcp.port: 9301 设置集群节点通信的TCP端口,默认就是9300
transport.tcp.compress: true 设置是否压缩TCP传输时的数据,默认为false
http.max_content_length: 200mb 设置http请求内容的最大容量,默认是100mb
http.cors.enabled: true 是否开启跨域访问
http.cors.allow-origin: "*" 开启跨域访问后的地址限制,*表示无限制
http.port: 9201 定义ES对外调用的http端口,默认是9200
discovery.zen.ping.unicast.hosts: ["127.0.0.1:9301","127.0.0.1:9302","127.0.0.1:9303"] 在Elasticsearch7.0版本已被移除,配置错误。写入候选主节点的设备地址,来开启服务时就可以被选为主节点。默认主机列表只有127.0.0.1和IPV6的本机回环地址。上面是书写格式,discover意思为发现,zen是判定集群成员的协议,unicast是单播的意思,ES5.0版本之后只支持单播的方式来进行集群间的通信,hosts为主机
discovery.zen.minimum_master_nodes: 2 在Elasticsearch7.0版本已被移除,配置无效,为了避免脑裂,集群的最少节点数量为,集群的总节点数量除以2加一
discovery.zen.fd.ping_timeout: 120s 在Elasticsearch7.0版本已被移除,配置无效。探测超时时间,默认是3秒,我们这里填120秒是为了防止网络不好的时候ES集群发生脑裂现象
discovery.zen.fd.ping_retries: 6 在Elasticsearch7.0版本已被移除,配置无效。探测次数,如果每次探测90秒,连续探测超过六次,则认为节点该节点已脱离集群,默认为3次
discovery.zen.fd.ping_interval: 15s 在Elasticsearch7.0版本已被移除,配置无效。节点每隔15秒向master发送一次心跳,证明自己和master还存活,默认为1秒太频繁
discovery.seed_hosts: ["127.0.0.1:9301","127.0.0.1:9302","127.0.0.1:9303"] Elasticsearch7新增参数,写入候选主节点的设备地址,来开启服务时就可以被选为主节点,由discovery.zen.ping.unicast.hosts:参数改变而来
cluster.initial_master_nodes: ["127.0.0.1:9301","127.0.0.1:9302"] Elasticsearch7新增参数,写入候选主节点的设备地址,来开启服务时就可以被选为主节点
cluster.fault_detection.leader_check.interval: 15s Elasticsearch7新增参数,设置每个节点在选中的主节点的检查之间等待的时间。默认为1秒
discovery.cluster_formation_warning_timeout: 30s Elasticsearch7新增参数,启动后30秒内,如果集群未形成,那么将会记录一条警告信息,警告信息未master not fount开始,默认为10秒
cluster.join.timeout: 30s Elasticsearch7新增参数,节点发送请求加入集群后,在认为请求失败后,再次发送请求的等待时间,默认为60秒
cluster.publish.timeout: 90s Elasticsearch7新增参数,设置主节点等待每个集群状态完全更新后发布到所有节点的时间,默认为30秒
cluster.routing.allocation.cluster_concurrent_rebalance: 32 集群内同时启动的数据任务个数,默认是2个
cluster.routing.allocation.node_concurrent_recoveries: 32 添加或删除节点及负载均衡时并发恢复的线程个数,默认4个
cluster.routing.allocation.node_initial_primaries_recoveries: 32 初始化数据恢复时,并发恢复线程的个数,默认4个

Elasticsearch使用操作

发表于 2019-03-17 | 分类于 Elasticsearch

ES架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Gateway层:
最底这一层最主要是是做数据持久化的,其实就是数据层,es支持可以在多种文件系统上运行,可以支持各种的存储系统,本地的存储系统、HadoopHDFS等等各种各样的数据存储系统,用于存元数据。

Lucene层
ElasticScarch的底层就是对Lucene的一个封装,它由始至终都要用到Lucene,ElasticScarch在Lucene的基础之上做了一个分布式的框架,把多个Lucene的索引给管理起来,最后做一个非常大规模的搜索引擎。

倒数第三层 index、search、mapping:
index模块: 怎么创建某个索引,数据如何创建,如何存储 都由它来管理。
search模块: 数据的查询由它来完成。
mapping模块: 相单于如何创建一个表的结构、怎么创建这个mapping都由它管理
River模块: 这个模块现在没有了,我记得是数据同步还是什么鬼(反正不重要)

倒数第四层 Discovery、Scripting、3rdPlugins:
Discovery模块: 服务发现模块、当有节点要加入进来需要通过此模块,主要处理节点与节点之间的问题,目前实现有两种方式,分别是ZEN、EC2,用的最多的是ZEN。
Scripting模块: 脚本模块、可以自己写一段脚本代码 来进行元数据的二次处理 返回给前台页面。(性能低最好别用)
3rdPlugins:第三方插件,ElasticScarch支持很多的第三方插件。

Transport层:
Transport层主要用于数据的传输、想访问ElasticScarch,第一步就是要和它进行连接,那么连接的步骤和数据的传输,就在这层。一般我们都用http进行交互。

RESTful 层:访问

** ES支持的客户端连接方式 **

1、REST API ,端口 9200

这种连接方式对应于架构图中的RESTful style API这一层,这种客户端的连接方式是RESTful风格的,使用http的方式进行连接

2、Transport 连接 端口 9300

这种连接方式对应于架构图中的Transport这一层,这种客户端连接方式是直接连接ES的节点,使用TCP的方式进行连接

3、多种编程语言客户端

1
2
3
4
5
6
7
8
9
10
Java REST Client [7.4] — other versions
Java API [7.4] — other versions
JavaScript API [7.x] — other versions
Ruby API [7.x] — other versions
Go API
.NET API [7.x] — other versions
PHP API [7.2] — other versions
Perl API
Python API
Community Contributed Clients

Java REST Client

** ES提供了两个JAVA REST client 版本**

Java Low Level REST Client: 低级别的REST客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串。兼容所有ES版本。

Java High Level REST Client: 高级别的REST客户端,基于低级别的REST客户端,增加了编组请求JSON串、解析响应JSON串等相关api。使用的版本需要保持和ES服务端的版本一致,否则会有版本问题。

** Java Low Level REST Client **

maven 引入、使用介绍: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html

API doc :https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-client/6.2.4/index.html.

** Java High Level REST Client **

从6.0.0开始加入的,目的是以java面向对象的方式来进行请求、响应处理。每个API 支持 同步/异步 两种方式,同步方法直接返回一个结果对象。异步的方法以async为后缀,通过listener参数来通知结果。
高级java REST 客户端依赖Elasticsearch core project

兼容性说明:依赖 java1.8 和 Elasticsearch core project,请使用与服务端ES版本一致的客户端版本

** Java High Level REST Client maven 集成**

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>

** Java High Level REST Client 初始化**

1
2
3
4
5
6
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
//...
client.close();

给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求,API及用法示例,请参考:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-supported-apis.html

Java High Level REST Client 使用示例

引入包

maven工程里面引入和ES服务端版本一样的Java客户端

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>

初始化客户端

给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求。

1
2
3
4
5
6
7
8
9
10
11
public class InitDemo {

public static RestHighLevelClient getClient() {

RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));

return client;
}
}

Create index 创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class CreateIndexDemo {

public static void main(String[] args) {
try (RestHighLevelClient client = InitDemo.getClient();) {

// 1、创建 创建索引request 参数:索引名mess
CreateIndexRequest request = new CreateIndexRequest("mess");

// 2、设置索引的settings
request.settings(Settings.builder().put("index.number_of_shards", 3) // 分片数
.put("index.number_of_replicas", 2) // 副本数
.put("analysis.analyzer.default.tokenizer", "ik_smart") // 默认分词器
);

// 3、设置索引的mappings
request.mapping("_doc",
" {\n" +
" \"_doc\": {\n" +
" \"properties\": {\n" +
" \"message\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }",
XContentType.JSON);

// 4、 设置索引的别名
request.alias(new Alias("mmm"));

// 5、 发送请求
// 5.1 同步方式发送请求
CreateIndexResponse createIndexResponse = client.indices()
.create(request);

// 6、处理响应
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse
.isShardsAcknowledged();
System.out.println("acknowledged = " + acknowledged);
System.out.println("shardsAcknowledged = " + shardsAcknowledged);

// 5.1 异步方式发送请求
/*ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(
CreateIndexResponse createIndexResponse) {
// 6、处理响应
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse
.isShardsAcknowledged();
System.out.println("acknowledged = " + acknowledged);
System.out.println(
"shardsAcknowledged = " + shardsAcknowledged);
}

@Override
public void onFailure(Exception e) {
System.out.println("创建索引异常:" + e.getMessage());
}
};

client.indices().createAsync(request, listener);
*/
} catch (IOException e) {
e.printStackTrace();
}
}
}

index document

索引文档,即往索引里面放入文档数据.类似于数据库里面向表里面插入一行数据,一行数据就是一个文档。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class IndexDocumentDemo {

private static Logger logger = LogManager.getRootLogger();

public static void main(String[] args) {
try (RestHighLevelClient client = InitDemo.getClient();) {
// 1、创建索引请求
IndexRequest request = new IndexRequest(
"mess", //索引
"_doc", // mapping type
"1"); //文档id

// 2、准备文档数据
// 方式一:直接给JSON串
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);

// 方式二:以map对象来表示文档
/*
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
request.source(jsonMap);
*/

// 方式三:用XContentBuilder来构建文档
/*
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.field("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
request.source(builder);
*/

// 方式四:直接用key-value对给出
/*
request.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
*/

//3、其他的一些可选设置
/*
request.routing("routing"); //设置routing值
request.timeout(TimeValue.timeValueSeconds(1)); //设置主分片等待时长
request.setRefreshPolicy("wait_for"); //设置重刷新策略
request.version(2); //设置版本号
request.opType(DocWriteRequest.OpType.CREATE); //操作类别
*/

//4、发送请求
IndexResponse indexResponse = null;
try {
// 同步方式
indexResponse = client.index(request);
} catch(ElasticsearchException e) {
// 捕获,并处理异常
//判断是否版本冲突、create但文档已存在冲突
if (e.status() == RestStatus.CONFLICT) {
logger.error("冲突了,请在此写冲突处理逻辑!\n" + e.getDetailedMessage());
}

logger.error("索引异常", e);
}

//5、处理响应
if(indexResponse != null) {
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("新增文档成功,处理逻辑代码写到这里。");
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("修改文档成功,处理逻辑代码写到这里。");
}
// 分片处理信息
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

}
// 如果有分片副本失败,可以获得失败原因信息
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
System.out.println("副本失败原因:" + reason);
}
}
}


//异步方式发送索引请求
/*ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {

}

@Override
public void onFailure(Exception e) {

}
};
client.indexAsync(request, listener);
*/

} catch (IOException e) {
e.printStackTrace();
}
}
}

获取文档数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class GetDocumentDemo {

private static Logger logger = LogManager.getRootLogger();

public static void main(String[] args) {
try (RestHighLevelClient client = InitDemo.getClient();) {
// 1、创建获取文档请求
GetRequest request = new GetRequest(
"mess", //索引
"_doc", // mapping type
"1"); //文档id

// 2、可选的设置
//request.routing("routing");
//request.version(2);

//request.fetchSourceContext(new FetchSourceContext(false)); //是否获取_source字段
//选择返回的字段
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);

//也可写成这样
/*String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);*/


// 取stored字段
/*request.storedFields("message");
GetResponse getResponse = client.get(request);
String message = getResponse.getField("message").getValue();*/


//3、发送请求
GetResponse getResponse = null;
try {
// 同步请求
getResponse = client.get(request);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
logger.error("没有找到该id的文档" );
}
if (e.status() == RestStatus.CONFLICT) {
logger.error("获取时版本冲突了,请在此写冲突处理逻辑!" );
}
logger.error("获取文档异常", e);
}

//4、处理响应
if(getResponse != null) {
String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) { // 文档存在
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString(); //结果取成 String
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // 结果取成Map
byte[] sourceAsBytes = getResponse.getSourceAsBytes(); //结果取成字节数组

logger.info("index:" + index + " type:" + type + " id:" + id);
logger.info(sourceAsString);

} else {
logger.error("没有找到该id的文档" );
}
}


//异步方式发送获取文档请求
/*
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {

}

@Override
public void onFailure(Exception e) {

}
};
client.getAsync(request, listener);
*/

} catch (IOException e) {
e.printStackTrace();
}
}
}

批量索引文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class BulkDemo {

private static Logger logger = LogManager.getRootLogger();

public static void main(String[] args) {
try (RestHighLevelClient client = InitDemo.getClient();) {

// 1、创建批量操作请求
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("mess", "_doc", "1")
.source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("mess", "_doc", "2")
.source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("mess", "_doc", "3")
.source(XContentType.JSON,"field", "baz"));

/*
request.add(new DeleteRequest("mess", "_doc", "3"));
request.add(new UpdateRequest("mess", "_doc", "2")
.doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("mess", "_doc", "4")
.source(XContentType.JSON,"field", "baz"));
*/

// 2、可选的设置
/*
request.timeout("2m");
request.setRefreshPolicy("wait_for");
request.waitForActiveShards(2);
*/


//3、发送请求

// 同步请求
BulkResponse bulkResponse = client.bulk(request);


//4、处理响应
if(bulkResponse != null) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();

if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
//TODO 新增成功的处理

} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
//TODO 修改成功的处理

} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
//TODO 删除成功的处理
}
}
}


//异步方式发送批量操作请求
/*
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {

}

@Override
public void onFailure(Exception e) {

}
};
client.bulkAsync(request, listener);
*/

} catch (IOException e) {
e.printStackTrace();
}
}
}

搜索数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
public class SearchDemo {

private static Logger logger = LogManager.getRootLogger();

public static void main(String[] args) {
try (RestHighLevelClient client = InitDemo.getClient();) {

// 1、创建search请求
//SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("bank");
searchRequest.types("_doc");

// 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

//构造QueryBuilder
/*QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
.fuzziness(Fuzziness.AUTO)
.prefixLength(3)
.maxExpansions(10);
sourceBuilder.query(matchQueryBuilder);*/

sourceBuilder.query(QueryBuilders.termQuery("age", 24));
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

//是否返回_source字段
//sourceBuilder.fetchSource(false);

//设置返回哪些字段
/*String[] includeFields = new String[] {"title", "user", "innerObject.*"};
String[] excludeFields = new String[] {"_type"};
sourceBuilder.fetchSource(includeFields, excludeFields);*/

//指定排序
//sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
//sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));

// 设置返回 profile
//sourceBuilder.profile(true);

//将请求体加入到请求中
searchRequest.source(sourceBuilder);

// 可选的设置
//searchRequest.routing("routing");

// 高亮设置
/*
HighlightBuilder highlightBuilder = new HighlightBuilder();
HighlightBuilder.Field highlightTitle =
new HighlightBuilder.Field("title");
highlightTitle.highlighterType("unified");
highlightBuilder.field(highlightTitle);
HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
highlightBuilder.field(highlightUser);
sourceBuilder.highlighter(highlightBuilder);*/


//加入聚合
/*TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
.field("company.keyword");
aggregation.subAggregation(AggregationBuilders.avg("average_age")
.field("age"));
sourceBuilder.aggregation(aggregation);*/

//做查询建议
/*SuggestionBuilder termSuggestionBuilder =
SuggestBuilders.termSuggestion("user").text("kmichy");
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
sourceBuilder.suggest(suggestBuilder);*/

//3、发送请求
SearchResponse searchResponse = client.search(searchRequest);


//4、处理响应
//搜索结果状态信息
RestStatus status = searchResponse.status();
TimeValue took = searchResponse.getTook();
Boolean terminatedEarly = searchResponse.isTerminatedEarly();
boolean timedOut = searchResponse.isTimedOut();

//分片搜索情况
int totalShards = searchResponse.getTotalShards();
int successfulShards = searchResponse.getSuccessfulShards();
int failedShards = searchResponse.getFailedShards();
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
// failures should be handled here
}

//处理搜索命中文档结果
SearchHits hits = searchResponse.getHits();

long totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();

SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
// do something with the SearchHit

String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();

//取_source字段值
String sourceAsString = hit.getSourceAsString(); //取成json串
Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
//从map中取字段值
/*
String documentTitle = (String) sourceAsMap.get("title");
List<Object> users = (List<Object>) sourceAsMap.get("user");
Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
*/
logger.info("index:" + index + " type:" + type + " id:" + id);
logger.info(sourceAsString);

//取高亮结果
/*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField highlight = highlightFields.get("title");
Text[] fragments = highlight.fragments();
String fragmentString = fragments[0].string();*/
}

// 获取聚合结果
/*
Aggregations aggregations = searchResponse.getAggregations();
Terms byCompanyAggregation = aggregations.get("by_company");
Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
Avg averageAge = elasticBucket.getAggregations().get("average_age");
double avg = averageAge.getValue();
*/

// 获取建议结果
/*Suggest suggest = searchResponse.getSuggest();
TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
for (TermSuggestion.Entry.Option option : entry) {
String suggestText = option.getText().string();
}
}
*/

//异步方式发送获查询请求
/*
ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse getResponse) {
//结果获取
}

@Override
public void onFailure(Exception e) {
//失败处理
}
};
client.searchAsync(searchRequest, listener);
*/

} catch (IOException e) {
logger.error(e);
}
}
}

highlight 高亮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class HighlightDemo {

private static Logger logger = LogManager.getRootLogger();

public static void main(String[] args) {
try (RestHighLevelClient client = InitDemo.getClient();) {

// 1、创建search请求
SearchRequest searchRequest = new SearchRequest("hl_test");

// 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

//构造QueryBuilder
QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title", "lucene solr");
sourceBuilder.query(matchQueryBuilder);

//分页设置
/*sourceBuilder.from(0);
sourceBuilder.size(5); ;*/


// 高亮设置
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.requireFieldMatch(false).field("title").field("content")
.preTags("<strong>").postTags("</strong>");
//不同字段可有不同设置,如不同标签
/*HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title");
highlightTitle.preTags("<strong>").postTags("</strong>");
highlightBuilder.field(highlightTitle);
HighlightBuilder.Field highlightContent = new HighlightBuilder.Field("content");
highlightContent.preTags("<b>").postTags("</b>");
highlightBuilder.field(highlightContent).requireFieldMatch(false);*/

sourceBuilder.highlighter(highlightBuilder);

searchRequest.source(sourceBuilder);

//3、发送请求
SearchResponse searchResponse = client.search(searchRequest);


//4、处理响应
if(RestStatus.OK.equals(searchResponse.status())) {
//处理搜索命中文档结果
SearchHits hits = searchResponse.getHits();
long totalHits = hits.getTotalHits();

SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();

//取_source字段值
//String sourceAsString = hit.getSourceAsString(); //取成json串
Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
//从map中取字段值
/*String title = (String) sourceAsMap.get("title");
String content = (String) sourceAsMap.get("content"); */
logger.info("index:" + index + " type:" + type + " id:" + id);
logger.info("sourceMap : " + sourceAsMap);
//取高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField highlight = highlightFields.get("title");
if(highlight != null) {
Text[] fragments = highlight.fragments(); //多值的字段会有多个值
if(fragments != null) {
String fragmentString = fragments[0].string();
logger.info("title highlight : " + fragmentString);
//可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
//sourceAsMap.put("title", fragmentString);
}
}

highlight = highlightFields.get("content");
if(highlight != null) {
Text[] fragments = highlight.fragments(); //多值的字段会有多个值
if(fragments != null) {
String fragmentString = fragments[0].string();
logger.info("content highlight : " + fragmentString);
//可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
//sourceAsMap.put("content", fragmentString);
}
}
}
}

} catch (IOException e) {
logger.error(e);
}
}
}

suggest 查询建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
public class SuggestDemo {

private static Logger logger = LogManager.getRootLogger();

//词项建议拼写检查,检查用户的拼写是否错误,如果有错给用户推荐正确的词,appel->apple
public static void termSuggest() {
try (RestHighLevelClient client = InitDemo.getClient();) {

// 1、创建search请求
//SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("mess");

// 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

sourceBuilder.size(0);

//做查询建议
//词项建议
SuggestionBuilder termSuggestionBuilder =
SuggestBuilders.termSuggestion("user").text("kmichy");
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
sourceBuilder.suggest(suggestBuilder);

searchRequest.source(sourceBuilder);

//3、发送请求
SearchResponse searchResponse = client.search(searchRequest);


//4、处理响应
//搜索结果状态信息
if(RestStatus.OK.equals(searchResponse.status())) {
// 获取建议结果
Suggest suggest = searchResponse.getSuggest();
TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
logger.info("text: " + entry.getText().string());
for (TermSuggestion.Entry.Option option : entry) {
String suggestText = option.getText().string();
logger.info(" suggest option : " + suggestText);
}
}
}
/*
"suggest": {
"my-suggestion": [
{
"text": "tring",
"offset": 0,
"length": 5,
"options": [
{
"text": "trying",
"score": 0.8,
"freq": 1
}
]
},
{
"text": "out",
"offset": 6,
"length": 3,
"options": []
},
{
"text": "elasticsearch",
"offset": 10,
"length": 13,
"options": []
}
]
}*/

} catch (IOException e) {
logger.error(e);
}
}

//自动补全,根据用户的输入联想到可能的词或者短语
public static void completionSuggester() {
try (RestHighLevelClient client = InitDemo.getClient();) {

// 1、创建search请求
//SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("music");

// 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

sourceBuilder.size(0);

//做查询建议
//自动补全
/*POST music/_search?pretty
{
"suggest": {
"song-suggest" : {
"prefix" : "lucene s",
"completion" : {
"field" : "suggest" ,
"skip_duplicates": true
}
}
}
}*/

SuggestionBuilder termSuggestionBuilder =
SuggestBuilders.completionSuggestion("suggest").prefix("lucene s")
.skipDuplicates(true);
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("song-suggest", termSuggestionBuilder);
sourceBuilder.suggest(suggestBuilder);

searchRequest.source(sourceBuilder);

//3、发送请求
SearchResponse searchResponse = client.search(searchRequest);


//4、处理响应
//搜索结果状态信息
if(RestStatus.OK.equals(searchResponse.status())) {
// 获取建议结果
Suggest suggest = searchResponse.getSuggest();
CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest");
for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) {
logger.info("text: " + entry.getText().string());
for (CompletionSuggestion.Entry.Option option : entry) {
String suggestText = option.getText().string();
logger.info(" suggest option : " + suggestText);
}
}
}

} catch (IOException e) {
logger.error(e);
}
}

public static void main(String[] args) {
termSuggest();

logger.info("--------------------------------------");

completionSuggester();
}
}

aggregation 聚合分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class AggregationDemo {

private static Logger logger = LogManager.getRootLogger();

public static void main(String[] args) {
try (RestHighLevelClient client = InitDemo.getClient();) {

// 1、创建search请求
//SearchRequest searchRequest = new SearchRequest();
SearchRequest searchRequest = new SearchRequest("bank");

// 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

sourceBuilder.size(0);

//加入聚合
//字段值项分组聚合
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age")
.field("age").order(BucketOrder.aggregation("average_balance", true));
//计算每组的平均balance指标
aggregation.subAggregation(AggregationBuilders.avg("average_balance")
.field("balance"));
sourceBuilder.aggregation(aggregation);

searchRequest.source(sourceBuilder);

//3、发送请求
SearchResponse searchResponse = client.search(searchRequest);

//4、处理响应
//搜索结果状态信息
if(RestStatus.OK.equals(searchResponse.status())) {
// 获取聚合结果
Aggregations aggregations = searchResponse.getAggregations();
Terms byAgeAggregation = aggregations.get("by_age");
logger.info("aggregation by_age 结果");
logger.info("docCountError: " + byAgeAggregation.getDocCountError());
logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts());
logger.info("------------------------------------");
for(Bucket buck : byAgeAggregation.getBuckets()) {
logger.info("key: " + buck.getKeyAsNumber());
logger.info("docCount: " + buck.getDocCount());
logger.info("docCountError: " + buck.getDocCountError());
//取子聚合
Avg averageBalance = buck.getAggregations().get("average_balance");

logger.info("average_balance: " + averageBalance.getValue());
logger.info("------------------------------------");
}
//直接用key 来去分组
/*Bucket elasticBucket = byCompanyAggregation.getBucketByKey("24");
Avg averageAge = elasticBucket.getAggregations().get("average_age");
double avg = averageAge.getValue();*/

}

} catch (IOException e) {
logger.error(e);
}
}
}

各种查询对应的QueryBuilder:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-query-builders.html

各种聚合对应的AggregationBuilder:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-aggregation-builders.html

参考:https://www.cnblogs.com/leeSmall/p/9218779.html

Elasticsearch聚合分析

发表于 2019-03-16 | 分类于 Elasticsearch

聚合分析简介

聚合分析是数据库中重要的功能特性,完成对一个查询的数据集中数据的聚合计算,如:找出某字段(或计算表达式的结果)的最大值、最小值,计算和、平均值等。ES作为搜索引擎兼数据库,同样提供了强大的聚合分析能力。

对一个数据集求最大、最小、和、平均值等指标的聚合,在ES中称为指标聚合 metric

而关系型数据库中除了有聚合函数外,还可以对查询出的数据进行分组group by,再在组上进行指标聚合。在 ES 中group by 称为分桶,桶聚合 bucketing

ES中还提供了矩阵聚合(matrix)、管道聚合(pipleline),但还在完善中。

ES聚合分析查询的写法

在查询请求体中以aggregations节点按如下语法定义聚合分析(aggregations 也可简写为 aggs):

1
2
3
4
5
6
7
8
9
10
"aggregations" : {
"<aggregation_name>" : { <!--聚合的名字 -->
"<aggregation_type>" : { <!--聚合的类型 -->
<aggregation_body> <!--聚合体:对哪些字段进行聚合 -->
}
[,"meta" : { [<meta_data_body>] } ]? <!--元 -->
[,"aggregations" : { [<sub_aggregation>]+ } ]? <!--在聚合里面在定义子聚合 -->
}
[,"<aggregation_name_2>" : { ... } ]*<!--聚合的名字 -->
}

聚合计算的值可以取字段的值,也可是脚本计算的结果。

指标聚合

max min sum avg

最大值:

1
2
3
4
5
6
7
8
9
10
11
POST /bank/_search?
{
"size": 0,
"aggs": {
"masssbalance": {
"max": {
"field": "balance"
}
}
}
}

年龄为24岁的客户中的余额最大值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
POST /bank/_search?
{
"size": 20,
"query": {
"match": {
"age": 24
}
},
"sort": [
{
"balance": {
"order": "desc"
}
}
],
"aggs": {
"max_balance": {
"max": {
"field": "balance"
}
}
}
}

值来源于脚本,查询所有客户的平均年龄是多少,并对平均年龄加10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
POST /bank/_search?size=0
{
"aggs": {
"avg_age": {
"avg": {
"script": {
"source": "doc.age.value"
}
}
},
"avg_age10": {
"avg": {
"script": {
"source": "doc.age.value + 10"
}
}
}
}
}

指定field,在脚本中用_value 取字段的值

1
2
3
4
5
6
7
8
9
10
11
12
13
POST /bank/_search?size=0
{
"aggs": {
"sum_balance": {
"sum": {
"field": "balance",
"script": {
"source": "_value * 1.03"
}
}
}
}
}

为没有值字段指定值。如未指定,缺失该字段值的文档将被忽略。

1
2
3
4
5
6
7
8
9
10
11
POST /bank/_search?size=0
{
"aggs": {
"avg_age": {
"avg": {
"field": "age",
"missing": 18
}
}
}
}

文档计数 count

统计银行索引bank下年龄为24的文档数量

1
2
3
4
5
6
7
8
POST /bank/accounts/_count
{
"query": {
"match": {
"age" : 24
}
}
}

value_count 统计某字段有值的文档数

1
2
3
4
5
6
7
8
9
10
POST /bank/_search?size=1
{
"aggs": {
"age_count": {
"value_count": {
"field": "age"
}
}
}
}

cardinality 值去重计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST /bank/_search?size=1
{
"aggs": {
"age_count": {
"cardinality": {
"field": "age"
}
},
"state_count": {
"cardinality": {
"field": "state.keyword"
}
}
}
}

说明:state的使用它的keyword版

stats 统计 count max min avg sum 5个值

1
2
3
4
5
6
7
8
9
10
POST /bank/_search?size=0
{
"aggs": {
"age_stats": {
"stats": {
"field": "age"
}
}
}
}

Extended stats

高级统计,比stats多4个统计结果: 平方和、方差、标准差、平均值加/减两个标准差的区间

1
2
3
4
5
6
7
8
9
10
POST /bank/_search?size=0
{
"aggs": {
"age_stats": {
"extended_stats": {
"field": "age"
}
}
}
}

Percentiles 占比百分位对应的值统计

对指定字段(脚本)的值按从小到大累计每个值对应的文档数的占比(占所有命中文档数的百分比),返回指定占比比例对应的值。默认返回[ 1, 5, 25, 50, 75, 95, 99 ]分位上的值。如下中间的结果,可以理解为:占比为50%的文档的age值 <= 31,或反过来:age<=31的文档数占总命中文档数的50%

1
2
3
4
5
6
7
8
9
10
POST /bank/_search?size=0
{
"aggs": {
"age_percents": {
"percentiles": {
"field": "age"
}
}
}
}

结果说明:占比为50%的文档的age值 <= 31,或反过来:age<=31的文档数占总命中文档数的50%

指定分位值

1
2
3
4
5
6
7
8
9
10
11
POST /bank/_search?size=0
{
"aggs": {
"age_percents": {
"percentiles": {
"field": "age",
"percents" : [95, 99, 99.9]
}
}
}
}

Percentiles rank 统计值小于等于指定值的文档占比

统计年龄小于25和30的文档的占比,和第7项相反

1
2
3
4
5
6
7
8
9
10
11
12
13
14
POST /bank/_search?size=0
{
"aggs": {
"gge_perc_rank": {
"percentile_ranks": {
"field": "age",
"values": [
25,
30
]
}
}
}
}

结果说明:年龄小于25的文档占比为26.1%,年龄小于30的文档占比为49.2%,

Geo Bounds aggregation 求文档集中的地理位置坐标点的范围

参考官网链接:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-geobounds-aggregation.html

Geo Centroid aggregation 求地理位置中心点坐标值

参考官网链接:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-geocentroid-aggregation.html

桶聚合

Terms Aggregation 根据字段值项分组聚合

1
2
3
4
5
6
7
8
9
10
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age"
}
}
}
}

结果说明:

1
2
"doc_count_error_upper_bound": 0:文档计数的最大偏差值
"sum_other_doc_count": 463:未返回的其他项的文档数

默认情况下返回按文档计数从高到低的前10个分组。

年龄为31的文档有61个,年龄为39的文档有60个

** size 指定返回多少个分组 **

指定返回20个分组

1
2
3
4
5
6
7
8
9
10
11
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",
"size": 20
}
}
}
}

每个分组上显示偏差值

1
2
3
4
5
6
7
8
9
10
11
12
13
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",
"size": 5,
"shard_size": 20,
"show_term_doc_count_error": true
}
}
}
}

** shard_size ** 指定每个分片上返回多少个分组

shard_size 的默认值为:索引只有一个分片:= size,多分片:= size * 1.5 + 10

1
2
3
4
5
6
7
8
9
10
11
12
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",
"size": 5,
"shard_size": 20
}
}
}
}

** order ** 指定分组的排序

根据文档计数排序

1
2
3
4
5
6
7
8
9
10
11
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",
"order" : { "_count" : "asc" }
}
}
}
}

根据分组值排序

1
2
3
4
5
6
7
8
9
10
11
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",
"order" : { "_key" : "asc" }
}
}
}
}

取分组指标值排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",
"order": {
"max_balance": "asc"
}
},
"aggs": {
"max_balance": {
"max": {
"field": "balance"
}
},
"min_balance": {
"min": {
"field": "balance"
}
}
}
}
}
}

筛选分组-正则表达式匹配值

1
2
3
4
5
6
7
8
9
10
11
12
GET /_search
{
"aggs" : {
"tags" : {
"terms" : {
"field" : "tags",
"include" : ".*sport.*",
"exclude" : "water_.*"
}
}
}
}

筛选分组-指定值列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
GET /_search
{
"aggs" : {
"JapaneseCars" : {
"terms" : {
"field" : "make",
"include" : ["mazda", "honda"]
}
},
"ActiveCarManufacturers" : {
"terms" : {
"field" : "make",
"exclude" : ["rover", "jensen"]
}
}
}
}

根据脚本计算值分组

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /_search
{
"aggs" : {
"genres" : {
"terms" : {
"script" : {
"source": "doc['genre'].value",
"lang": "painless"
}
}
}
}
}

缺失值处理

1
2
3
4
5
6
7
8
9
10
11
GET /_search
{
"aggs" : {
"tags" : {
"terms" : {
"field" : "tags",
"missing": "N/A"
}
}
}
}

filter Aggregation 对满足过滤查询的文档进行聚合计算

在查询命中的文档中选取符合过滤条件的文档进行聚合,先过滤再聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"filter": {"match":{"gender":"F"}},
"aggs": {
"avg_age": {
"avg": {
"field": "age"
}
}
}
}
}
}

Filters Aggregation 多个过滤组聚合计算

准备数据:

1
2
3
4
5
6
7
PUT /logs/_doc/_bulk?refresh
{"index":{"_id":1}}
{"body":"warning: page could not be rendered"}
{"index":{"_id":2}}
{"body":"authentication error"}
{"index":{"_id":3}}
{"body":"warning: connection timed out"}

获取组合过滤后聚合的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
GET logs/_search
{
"size": 0,
"aggs": {
"messages": {
"filters": {
"filters": {
"errors": {
"match": {
"body": "error"
}
},
"warnings": {
"match": {
"body": "warning"
}
}
}
}
}
}
}

为其他值组指定key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
GET logs/_search
{
"size": 0,
"aggs": {
"messages": {
"filters": {
"other_bucket_key": "other_messages",
"filters": {
"errors": {
"match": {
"body": "error"
}
},
"warnings": {
"match": {
"body": "warning"
}
}
}
}
}
}
}

Range Aggregation 范围分组聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
POST /bank/_search?size=0
{
"aggs": {
"age_range": {
"range": {
"field": "age",
"ranges": [
{
"to": 25
},
{
"from": 25,
"to": 35
},
{
"from": 35
}
]
},
"aggs": {
"bmax": {
"max": {
"field": "balance"
}
}
}
}
}
}

为组指定key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
POST /bank/_search?size=0
{
"aggs": {
"age_range": {
"range": {
"field": "age",
"keyed": true,
"ranges": [
{
"to": 25,
"key": "Ld"
},
{
"from": 25,
"to": 35,
"key": "Md"
},
{
"from": 35,
"key": "Od"
}
]
}
}
}
}

Date Range Aggregation 时间范围分组聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
POST /bank/_search?size=0
{
"aggs": {
"range": {
"date_range": {
"field": "date",
"format": "MM-yyy",
"ranges": [
{
"to": "now-10M/M"
},
{
"from": "now-10M/M"
}
]
}
}
}
}

Date Histogram Aggregation 时间直方图(柱状)聚合

就是按天、月、年等进行聚合统计。可按 year (1y), quarter (1q), month (1M), week (1w), day (1d), hour (1h), minute (1m), second (1s) 间隔聚合或指定的时间间隔聚合。

1
2
3
4
5
6
7
8
9
10
11
POST /bank/_search?size=0
{
"aggs": {
"sales_over_time": {
"date_histogram": {
"field": "date",
"interval": "month"
}
}
}
}

Missing Aggregation 缺失值的桶聚合

1
2
3
4
5
6
7
8
POST /bank/_search?size=0
{
"aggs" : {
"account_without_a_age" : {
"missing" : { "field" : "age" }
}
}
}

Geo Distance Aggregation 地理距离分区聚合

参考官网链接:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-geodistance-aggregation.html

Elasticsearch搜索详解

发表于 2019-03-15 | 分类于 Elasticsearch

查询建议

查询建议:查询建议,为用户提供良好的使用体验。主要包括: 拼写检查; 自动建议查询词(自动补全)。

ES中查询建议的API

查询建议也是使用_search端点地址。在DSL中suggest节点来定义需要的建议查询

示例1:定义单个建议查询词

定义查询建议

1
2
3
4
5
6
7
8
"suggest" : { 
"my-suggestion" : { <!-- 一个建议查询名 -->
"text" : "tring out Elasticsearch", <!-- 查询文本 -->
"term" : { <!-- 使用词项建议器 -->
"field" : "message" <!-- 指定在哪个字段上获取建议词 -->
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
POST bank/_search
{
"query" : {
"match": {
"email": "virginiaayala@filodyne.com"
}
},
"suggest" : {
"my-suggestion" : {
"text" : "virginiaayala@filodyne.com",
"term" : {
"field" : "email"
}
}
}
}

示例2:定义多个建议查询词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
POST bank/_search
{
"suggest": {
"my-suggest-1" : {
"text" : "virginiaayala@filodyne.com",
"term" : {
"field" : "email"
}
},
"my-suggest-2" : {
"text" : "Nicholson",
"term" : {
"field" : "city"
}
}
}
}

示例3:多个建议查询可以使用全局的查询文本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
POST _search
{
"suggest": {
"text" : "virginiaayala@filodyne.com",
"my-suggest-1" : {
"term" : {
"field" : "email"
}
},
"my-suggest-2" : {
"term" : {
"field" : "city"
}
}
}
}

Suggester 介绍

Term suggester

term 词项建议器,对给入的文本进行分词,为每个词进行模糊查询提供词项建议。对于在索引中存在词默认不提供建议词,不存在的词则根据模糊查询结果进行排序后取一定数量的建议词。

常用的建议选项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
POST bank/_search
{
"query" : {
"match": {
"email": "virginiaayala@filodyne.com"
}
},
"suggest" : {
"my-suggestion" : {
"text" : "virginiaayala@filodyne.com",
"term" : {
"field" : "email"
}
}
}
}

phrase suggester

phrase 短语建议,在term的基础上,会考量多个term之间的关系,比如是否同时出现在索引的原文里,相邻程度,以及词频等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
POST bank/_search
{
"query" : {
"match_all": {}
},
"suggest" : {
"my-suggestion11" : {
"text" : "virginiaayala@filodyne.com",
"phrase" : {
"field" : "email"
}
}
}
}

Completion suggester 自动补全

针对自动补全场景而设计的建议器。此场景下用户每输入一个字符的时候,就需要即时发送一次查询请求到后端查找匹配项,在用户输入速度较高的情况下对后端响应速度要求比较苛刻。因此实现上它和前面两个Suggester采用了不同的数据结构,索引并非通过倒排来完成,而是将analyze过的数据编码成FST和索引一起存放。对于一个open状态的索引,FST会被ES整个装载到内存里的,进行前缀查找速度极快。但是FST只能用于前缀查找,这也是Completion Suggester的局限所在。

官网链接:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-suggesters-completion.html

为了使用自动补全,索引中用来提供补全建议的字段需特殊设计,字段类型为 completion。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PUT music
{
"mappings": {
"_doc" : {
"properties" : {
"suggest" : { <!-- 用于自动补全的字段 -->
"type" : "completion"
},
"title" : {
"type": "keyword"
}
}
}
}
}

Input 指定输入词 Weight 指定排序值(可选)

1
2
3
4
5
6
7
PUT music/_doc/1?refresh
{
"suggest" : {
"input": [ "Nevermind", "Nirvana" ],
"weight" : 34
}
}

指定不同的排序值:

1
2
3
4
5
6
7
8
9
10
11
12
PUT music/_doc/1?refresh
{
"suggest" : [
{
"input": "Nevermind",
"weight" : 10
},
{
"input": "Nirvana",
"weight" : 3
}
]}

放入一条重复数据

1
2
3
4
5
6
7
PUT music/_doc/2?refresh
{
"suggest" : {
"input": [ "Nevermind", "Nirvana" ],
"weight" : 20
}
}

查询建议根据前缀查询:

1
2
3
4
5
6
7
8
9
10
11
POST music/_search?pretty
{
"suggest": {
"song-suggest" : {
"prefix" : "nir",
"completion" : {
"field" : "suggest"
}
}
}
}

对建议查询结果去重

1
2
3
4
5
6
7
8
9
10
POST music/_search?pretty
{
"suggest": {
"song-suggest" : {
"prefix" : "nir",
"completion" : {
"field" : "suggest",
"skip_duplicates": true
}
} }}

查询建议文档存储短语

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PUT music/_doc/3?refresh
{
"suggest" : {
"input": [ "lucene solr", "lucene so cool","lucene elasticsearch" ],
"weight" : 20
}
}

PUT music/_doc/4?refresh
{
"suggest" : {
"input": ["lucene solr cool","lucene elasticsearch" ],
"weight" : 10
}
}

查询:

1
2
3
4
5
6
7
8
9
10
11
12
POST music/_search?pretty
{
"suggest": {
"song-suggest" : {
"prefix" : "lucene s",
"completion" : {
"field" : "suggest" ,
"skip_duplicates": true
}
}
}
}
上一页1…789…25下一页
初晨

初晨

永远不要说你知道本质,更别说真相了。

249 日志
46 分类
109 标签
近期文章
  • WebSocket、Socket、TCP、HTTP区别
  • Springboot项目的接口防刷
  • 深入理解Volatile关键字及其实现原理
  • 使用vscode搭建个人笔记环境
  • HBase介绍安装与操作
© 2018 — 2020 Copyright
由 Hexo 强力驱动
|
主题 — NexT.Gemini v5.1.4