简

人生短暂,学海无边,而大道至简。


  • 首页

  • 归档

  • 分类

  • 标签

RabbitMQ常见数据问题

发表于 2018-04-13 | 分类于 rabbitmq
1、生产者丢失数据

RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction:开启事物(channel.txSelect()),回滚(channel.txRollback()),提交事物(channel.txCommit())。
这种方式缺点:吞吐量下降。

confirm模式:channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID,当消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者,就能知道消息正确发送到队列。
如果rabiitMQ没能处理该消息,则会发送一个Nack消息说明给你,你可以进行重试操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void send(String orderId) {
JSONObject jsonObect = new JSONObject();
jsonObect.put("orderId", orderId);
String msg = jsonObect.toJSONString();
System.out.println("msg:" + msg);
// 封装消息
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(orderId).build();
// 构建回调返回的数据
CorrelationData correlationData = new CorrelationData(orderId);
// 发送消息 回调
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);
}
// 生产消息确认机制 生产者往服务器端发送消息的时候 采用应答机制
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String orderId = correlationData.getId(); //全局ID
System.out.println("消息id:" + correlationData.getId());
System.out.println(cause);
if (ack) { //消息发送成功
System.out.println("消息发送确认成功");
} else {
//重试机制
send(orderId);
System.out.println("消息发送确认失败:" + cause);
}
}

配置文件中加上:

1
2
3
#回调
publisher-confirms: true
publisher-returns: true
3、消费者丢失数据

启用手动确认模式可以解决

  • 自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    spring:
    rabbitmq:
    host: 192.168.6.238
    port: 5672
    username: guest
    password: guest
    listener:
    simple:
    retry:
    #开启消费者(程序出现异常的情况下会)进行重试
    enabled: true
    #最大重试次数
    max-attempts: 5
    #重试间隔次数
    initial-interval: 3000
    #重试次数过了是否丢失消息,false不丢弃时需要写相应代码将该消息加入死信队列
    #生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
    default-requeue-rejected: false
  • 手动确认模式
    指定Acknowledge的模式:

    1
    2
    3
    4
    listener:
    direct:
    #开启手动ack
    acknowledge-mode: manual

    如果消费者设置了手动应答模式,并且设置了重试,出现异常时无论是否捕获了异常,都是不会重试的
    如果消费者没有设置手动应答模式,并且设置了重试,那么在出现异常时没有捕获异常会进行重试,如果捕获了异常不会重试。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    try {
    int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity);
    if (insertDistribute > 0) {
    // 手动签收消息,通知mq服务器端删除该消息,是否批量.true:将一次性ack所有小于deliveryTag的消息。
    //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
    //默认情况下,当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,又继续异常,回滚,反复。所以加到对队尾
    //重新发送消息到队尾
    channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
    JSON.toJSONBytes(new Object()));
    }
    } catch (Exception e) {
    e.printStackTrace();
    //丢弃该消息
    channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
    //ack返回false,并重新回到队列
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
  • 不确认模式,acknowledge=”none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

2、消息队列丢数据

如果需要持久化,持久化配置可以和confirm配合使用,在消息持久化磁盘后,再给生产者发送一个Ack信号。如果消息持久化磁盘之前,rabbitMQ挂了,生产者收不到Ack信号,会自动重发。

1
2
3
4
5
6
7
8
9
10
11
12
//声明队列
@Bean
public Queue directCreateOrdersQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 30000);//30秒自动删除
/* 第一个参数:队列名称
第二个参数:是否持久化 durable=true。
第三个参数:排他队列exclusive,如果一个队列被声明为排他队列,连接的时候可见,断开就删除了,即使是持久化队列也一样。
第四个参数:自动删除,如果该队列没有任何订阅的消费者的话,临时队列。*/
Queue queue = new Queue(ORDER_CREATE_QUEUE, true, false, true, arguments);
return queue;
}

发消息的时候

1
2
3
4
5
6
// 封装消息
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).
setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE).//持久化设置
setExpiration("2019-12-10 11:22:33").//设置到期时间
setContentEncoding("utf-8").setMessageId(orderId).build();
this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

发消息也声明持久消息,这样rabbitMQ消息还没有持久化到硬盘时就算挂了,重启后也能通过引入mirrored-queue即镜像队列恢复数据,但不能保证消息百分百不丢失(整个集群都挂掉)

rabbitmq channel参数详解

发表于 2018-04-13 | 分类于 rabbitmq

1、Channel

1.1 channel.exchangeDeclare():

type:有direct、fanout、topic三种

durable:true、false true:服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在。原文:true if we are declaring a durable exchange (the exchange will survive a server restart)

autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange。原文1:true if the server should delete the exchange when it is no longer in use。

1.2 chanel.basicQos()

prefetchSize:0
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别
备注:据说prefetchSize 和global这两项,rabbitmq没有实现

1.3 channel.basicPublish()

routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用

mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

1.4 channel.basicAck();

deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

1.5 channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);

deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列

channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息

1.6 channel.basicConsume(QUEUE_NAME, true, consumer);

autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答

1.7 chanel.exchangeBind()

channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
用于通过绑定bindingKey将queue到Exchange,之后便可以进行消息接收

1.8 channel.queueDeclare(QUEUE_NAME, false, false, false, null);

durable:true、false true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列。this means that the queue will be deleted when there are no more processes consuming messages from it.

Rabbit实现最终一致性

发表于 2018-04-13 | 分类于 rabbitmq

一、分布式事务最终一致性思想

如果生产者投递消息到MQ服务器成功

场景1 如果消费者消费消息失败了

生产者是不需要回滚事务。 消费者采用手动ack应答方式  进行补偿机制,补偿的过程中注意 幂等性 问题。

分布式事务中遵循base理论 遵循cpa理论

如何确保生产者发送消息一定发送到MQ消息服务器端成功? confirm机制 确认应答机制

场景2 如果生产者发送消息到MQ服务器端失败

使用生产者重试机制进行发消息

二、分段式事务的补偿机制

分段式事务一般做法就是把需求任务分段式地完成,通过事务补偿机制来保证业务最终执行成功,补偿机制一般可以归类为2种:

1 )定时任务补偿:

  通过定时任务去跟进后续任务,根据不同的状态表确定下一步的操作,从而保证业务最终执行成功,

  这种办法可能会涉及到很多的后台服务,维护起来也会比较麻烦,这是应该是早期比较流行的做法

2) 消息补偿:

  通过消息中间件触发下一段任务,既通过实时消息通知下一段任务开始执行,执行完毕后的消息回发通知来保证业务最终完成;

  当然这也是异步进行的,但是能保证数据最终的完整性、一致性,也是近几年比较热门的做法

Rabbit介绍和原理及安装

发表于 2018-04-13 | 分类于 rabbitmq

1、简介

RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

2、RabbitMQ使用场景

RabbitMQ他是一个消息中间件,说道消息中间件【最主要的作用:信息的缓冲区】还是的从应用场景来看下:

系统集成与分布式系统的设计

各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即“通过消息传递的架构”。

举个例子:现在医院有两个科“看病科”和“住院科”在之前他们之间是没有任何关系的,如果你在“看病课”看完病后注册的信息和资料,到住院科后还得重新注册一遍?那现在要改革,你看完病后可以直接去住院科那两个系统之间需要打通怎么办?这里就可以使用我们的消息中间件了。

异步任务处理结果回调的设计

举个例子:记录日志,假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式去消费日志消息。这样不需要同步的写入日志了NICE

并发请求的压力高可用性设计

举个例子:比如电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。如果能够将请求转发到消息队列,再由服务器去消费这些消息将会使得请求变得平稳,提高系统的可用性。

3、模型

RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

20200413160210

https://www.cnblogs.com/ityouknow/p/6120544.html

左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
中间即是 RabbitMQ,其中包括了 交换机 和 队列。
右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。
那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

虚拟主机: 一个虚拟主机持有一组交换机、队列和绑定。

为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。

交换机: Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。

这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

绑定: 也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

交换机(Exchange)

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:

Direct:direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
Topic:按规则转发消息(最灵活)
Headers:设置 header attribute 参数类型的交换机
Fanout:转发消息到所有绑定队列

Direct Exchange

Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

Topic Exchange

Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
路由模式必须包含一个 星号(),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。

具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息。如下:

rabbitTemplate.convertAndSend(“testTopicExchange”,”key1.a.c.key2”, “ this is RabbitMQ!”);

topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:

*表示一个词.
#表示零个或多个词.

Headers Exchange

headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型.
在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

Fanout Exchange

Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。

安装

安装rabbitmq

安装过程 参考 (http://www.rabbitmq.com/install-rpm.html)目前安装包被包含在rpm仓库中是epel库

yum -y install epel-release.noarch

查看是否存在rabbitmq 然后安装

yum search rabbitmq-server  
yum -y install rabbitmq-server  

查看安装包

tail -1000 /var/log/yum.log

查看rabbitmq-server被安装的所有文件的位置

启动

1
2
3
4
5
 [root@localhost ~]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
[root@localhost ~]# netstat -aon | grep 5672
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN off (0.00/0/0)
tcp6 0 0 :::5672 :::* LISTEN off (0.00/0/0)

rabbitmq默认安装后 会添加账号rabbitmq 默认以该账号运行

[root@localhost ~]# more /etc/passwd | grep rabbitmq
rabbitmq:x:994:990:RabbitMQ messaging server:/var/lib/rabbitmq:/sbin/nologin

rabbitmq默认提供了一个web管理工具(rabbitmq_management)参考官方http://www.rabbitmq.com/management.html 默认已经安装 是一个插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[root@localhost ~]# cd /usr/lib/rabbitmq/
[root@localhost rabbitmq]# ./bin/rabbitmq-plugins list
[ ] amqp_client 3.3.5
[ ] cowboy 0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap 3.3.5-gite309de4
[ ] mochiweb 2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0 3.3.5
[ ] rabbitmq_auth_backend_ldap 3.3.5
[ ] rabbitmq_auth_mechanism_ssl 3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation 3.3.5
[ ] rabbitmq_federation_management 3.3.5
[ ] rabbitmq_management 3.3.5
[ ] rabbitmq_management_agent 3.3.5
[ ] rabbitmq_management_visualiser 3.3.5
[ ] rabbitmq_mqtt 3.3.5
[ ] rabbitmq_shovel 3.3.5
[ ] rabbitmq_shovel_management 3.3.5
[ ] rabbitmq_stomp 3.3.5
[ ] rabbitmq_test 3.3.5
[ ] rabbitmq_tracing 3.3.5
[ ] rabbitmq_web_dispatch 3.3.5
[ ] rabbitmq_web_stomp 3.3.5
[ ] rabbitmq_web_stomp_examples 3.3.5
[ ] sockjs 0.3.4-rmq3.3.5-git3132eb9
[ ] webmachine 1.10.3-rmq3.3.5-gite9359c7

启用插件

1
2
3
4
5
6
7
8
9
[root@localhost rabbitmq]# ./bin/rabbitmq-plugins  enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.

重启后浏览

[root@localhost rabbitmq]# service rabbitmq-server restart

docker上安装

docker pull rabbitmq:3.7.7-management

docker run -d –name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v pwd/data:/var/lib/rabbitmq –hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9 说明:

-d 后台运行容器;

–name 指定容器名;

-p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);

-v 映射目录或文件;

–hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);

-e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

5、使用命令:docker ps 查看正在运行容器

Redis持久化-主从同步和安全性-压测

发表于 2018-04-11 | 分类于 Redis

持久化方式

1、快照方式(RDB 默认)

这种方式就是将内存中数据以快照的方式写入到二进制文件中,默认的文件名为dump.rdb。

客户端也可以使用 e save 或者 e bgsave 命令通知 redis 做一次快照持久化。save 操作是在主线程中保存快照的,由于 redis 是用一个主线程来处理所有客户端的请求,这种方式会阻塞所有客户端请求。所以不推荐使用。

另一点需要注意的是,每次快照持久化都是将内存数据完整写入到磁盘一次,并不是增量的只同步增量数据。如果数据量大的话,写操作会比较多,必然会引起大量的磁盘 IO 操作,可能会严重影响性能。

注意:由于快照方式是在一定间隔时间做一次的,所以如果 redis 意外当机的话,就会丢失最后一次快照后的所有数据修改。

2、日志追加方式(AOF)

这种方式 redis 会将每一个收到的写命令都通过 write 函数追加到文件中(默认appendonly.aof)。当 redis 重启时会通过重新执行文件中保存的写命令来在内存中重建整个数据库的内容。

当然由于操作系统会在内核中缓存 write 做的修改,所以可能不是立即写到磁盘上。这样的持久化还是有可能会丢失部分修改。不过我们可以通过配置文件告诉redis 我们想要通过 fsync 函数强制操作系统写入到磁盘的时机。

有三种方式如下(默认是 :每秒 fsync 一次)
appendonly yes #启用日志追加持久化方式
#appendfsync always #每次收到写命令就立即强制写入磁盘,最慢的,但是保证完全的持久化,不推荐使用
appendfsync everysec //每秒钟强制写入磁盘一次,在性能和持久化方面做了很好的折中,推荐
#appendfsync no //完全依赖操作系统,性能最好,持久化没保证

日志追加方式同时带来了另一个问题。持久化文件会变的越来越大。例如我们调用 incrtest 命令 100 次,文件中必须保存全部 100 条命令,其实有 99 条都是多余的。因为要恢复数据库状态其实文件中保存一条 set test 100 就够了。

为了压缩这种持久化方式的日志文件 ,redis 提供了 bgrewriteaof 命令。收到此命令 redis 将使用与快照类似的方式将内存中的数据以命令的方式保存到临时文件中,最后替换原来的持久化日志文件。

虚拟内存

redis 没有使用操作系统提供的虚拟内存机制而是自己在用户态实现了自己的虚拟内存机制

对于redis 这样的内存数据库,内存总是不够用的。使用虚拟内存技术把那些不经常访问的数据交换到磁盘上。

vm-enabled yes #开启虚拟内存功能
vm-swap-file /tmp/redis.swap #交换出来 value 保存的文件路径/tmp/redis.swap
vm-max-memory 268435456 #redis 使用的最大内存上限(256MB),超过上限后redis 开始交换 value 到磁盘 swap 文件中。建议设置为系统空闲内存的 60%-80%
vm-page-size 32 #每个 redis 页的大小 32 个字节
vm-pages 134217728 #最多在文件中使用多少个页,交换文件的大小 =(vm-page-size * vm-pages)4GB
vm-max-threads 8 #用于执行 value 对象换入换出的工作线程数量。0表示不使用工作线程

redis 的虚拟内存在设计上为了保证 key 的查询速度,只会将 value 交换到 swap 文件中。 如果是由于太多 key 很小的 value 造成的内存问题,那么 redis 的虚拟内存并不能解决问题 。

和操作系统一样 redis 也是按页来交换对象的。redis 规定同一个页只能保存一个对象。但是一个对象可以保存在多个页中。在 redis 使用的内存没超过 vm-max-memory 之前是不会交换任何 value 的。当超过最大内存限制后,redis 会选择把较老的对象交换到 swap文件中去。如果两个对象一样老会优先交换比较大的对象,精确的交换计算公 式swappability = age*log(size_in_memory)。

redis 虚拟内存工作方式

当 vm-max-threads 设为 0 时(阻塞方式)

换出:主线程定期检查发现内存超出最大上限后,会直接以阻塞的方式,将选中的对象保存到 swap文件中,并释放对象占用的内存空间,此过程会一直重复直到下面条件满足

1.内存使用降到最大限制以下
2.swap 文件满了。
3.几乎全部的对象都被交换到磁盘了

换入:当有客户端请求已经被换出的 value 时,主线程会以阻塞的方式从 swap 文件中加载对应的value 对象,加载时此时会阻塞所有客户端。然后处理该客户端的请求

当 vm-max-threads 大于 0 时( ( 工作线程方式) )

换出:当主线程检测到使用内存超过最大上限,会将选中要交换的对象信息放到一个队列中交给工作线程后台处理,主线程会继续处理客户端请求。

换入:如果有客户端请求的 key 已经被换出了,主线程会先阻塞发出命令的客户端,然后将加载对象的信息放到一个队列中,让工作线程去加载。加载完毕后工作线程通知主线程。主线程再执行客户端的命令。这种方式只阻塞请求的 value 是已经被换出 key 的客户端。总的来说阻塞方式的性能会好一些,因为不需要线程同步、创建线程和恢复被阻塞的客户端等开销。但是也相应的牺牲了响应性。工作线程方式主线程不会阻塞在磁盘 IO 上,所以响应性更好。如果我们的应用不太经常发生换入换出,而且也不太在意有点延迟的话推荐使用阻塞方式。

主从主要是为了实现读写分离

Redis 支持将数据同步到多台从库上,这种特性对提高 读取性能非常有益。

1) master 可以有多个 slave。
2) 除了多个 slave 连到相同的 master 外,slave 也可以连接其它 slave 形成图状结构。
3) 主从复制不会阻塞 master。也就是说当一个或多个 slave 与 master 进行初次同步数据时,master 可以继续处理客户端发来的请求。相反 slave 在初次同步数据时则会阻塞不能处理客户端的请求。
4) 主从复制可以用来提高系统的可伸缩性,我们可以用多个 slave 专门用于客户端的读请求,比如 sort 操作可以使用 slave 来处理。也可以用来做简单的数据冗余。
5) 可以在 master 禁用数据持久化,只需要注释掉 master 配置文件中的所有 save 配置,然后只在 slave 上配置数据持久化。

Redis 主从复制的过程

当设置好 slave 服务器后,slave 会建立和 master 的连接,然后发送 sync 命令。无论第一次同步建立的连接还是连接断开后的重新连接,master 都会启动一个后台进程,将数据库快照保存到文件中,同时 master 主进程会开始收集新的写命令并缓存起来。

后台进程完成写文件后,master 就发送文件给 slave,slave 将文件保存到磁盘上,然后加载到内存恢复数据库快照到 slave 上。接着 master 就会把缓存的命令转发给 slave。而且后续 master 收到的写命令都会通过开始建立的连接发送给slave。

从master到slave的同步数据的命令和从 客户端发送的命令使用相同的协议格式。当 master 和 slave 的连接断开时 slave 可以自动重新建立连接。如果 master 同时收到多个 slave 发来的同步连接命令,只会启动一个进程来写数据库镜像,然后发送给所有 slave。
配置 slave 服务器很简单,只需要在配置文件中加入如下配置。

1
2
3
slaveof 192.168.1.1 6379 #指定 master 的 ip 和端口
slaveof <materip><materport>
materauth <master-password>

安全性

Redis设置密码:
配置文件中:
#requirepass foobared
requirepass *****
重启,再次进入:keys *
    提示没有权限
输入:auth ***
重新进入,这样每次都有可以重新输入密码
可以在登录客户端的时候登录授权:
redis-cli -a ***

发布和订阅:
使用subscribe进行订阅监听
使用publish惊醒发布消息广播。

压力测试

Redis 自带了一个叫redis-benchmark的工具来模拟 N 个客户端同时发出 M 个请求。

使用./redis-benchmark –help查看参数

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 运行默认配置下的测试
$ redis-benchmark

# 指定并发数20,总请求数为10W,redis server主机IP为192.168.1.1
$ redis-benchmark -h 192.168.1.1 -p 6379 -n 100000 -c 20

# 测试SET随机数性能
$ redis-benchmark -t set -n 1000000 -r 100000000

# 测试结果输出到csv
$ redis-benchmark -t ping,set,get -n 100000 --csv

# 执行特定命令下的测试
$ redis-benchmark -r 10000 -n 10000 eval 'return redis.call("ping")' 0

# 测试list入队的速度
$ redis-benchmark -r 10000 -n 10000 lpush mylist __rand_int__
-t 选择你想测试的命令,比如redis-benchmark -t set
-p 指定port redis-benchmark -p 6379
-l 一直循环
-c 指定客户端数量
-n 指定request数量
-q 运行在安静模式中
-r 随机  -比如想设置 10 万随机 key 连续 SET 1 万次  redis-benchmark -t set -r 100000 -n 10000

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@localhost bin]# ./redis-benchmark -h 192.168.1.105 -p 6001 -t set -n 100000 -r 1000000
====== SET ======
100000 requests completed in 1.58 seconds
50 parallel clients
3 bytes payload
keep alive: 1

90.70% <= 1 milliseconds
99.50% <= 2 milliseconds
99.91% <= 3 milliseconds
99.94% <= 4 milliseconds
99.95% <= 62 milliseconds
100.00% <= 63 milliseconds
100.00% <= 63 milliseconds
63492.06 requests per second

Redis数据类型和操作指令

发表于 2018-04-10 | 分类于 Redis

数据类型

key

Redis 的 key 是字符串类型,但是 key 中不能包括边界字符,由于 key 不是 binary safe的字符串,所以像”my key”和”mykey\n”这样包含空格和换行的 key 是不允许的。

exits key 检测指定 key 是否存在,返回 1 表示存在,0 不存在
del key1 key2 ...... keyN 删除给定 key,返回删除 key 的数目,0 表示给定 key 都不存在
type key 返回给定 key 值的类型。返回 none 表示 key 不存在,string 字符类型,list 链表类型 set 无序集合类型......
keys pattern 返回匹配指定模式的所有 key
randomkey 返回从当前数据库中随机选择的一个 key,如果当前数据库是空的,返回空串
rename oldkey newkey 重命名一个 key,如果 newkey 存在,将会被覆盖,返回 1 表示成功,0 失败。可能是 oldkey 不存在或者和 newkey 相同。
renamenx oldkey newkey 同上,但是如果 newkey 存在返回失败。
expire key seconds 为 key 指定过期时间,单位是秒。返回 1 成功,0 表示 key 已经设置过过期时间或者不存在。
ttl key 返回设置过过期时间key的剩余过期秒数。-1表示key不存在或者未设置过期时间。
select db-index 通过索引选择数据库,默认连接的数据库是 0,默认数据库数是 16 个。返回 1表示成功,0 失败。
move key db-index 将 key 从当前数据库移动到指定数据库。返回 1 表示成功。0 表示 key不存在或者已经在指定数据库中

value

redis 提供五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)和zset(sorted set有序集合)。

string类型

string 是最基本的类型,而且 string 类型是二进制安全的。意思是 redis 的 string 可以包含任何数据。比如 jpg 图片或者序列化的对象。从内部实现来看其实 string 可以看作 byte数组,最大上限是 1G 字节。

string 类型数据操作指令简介

set key value 设置 key 对应 string 类型的值,返回 1 表示成功,0 失败。
setnx key value 如果 key 不存在,设置 key 对应 string 类型的值。如果 key 已经存在,返回 0。
get key 获取 key 对应的 string 值,如果 key 不存在返回 nil
getset key value 先获取 key 的值,再设置 key 的值。如果 key 不存在返回 nil。
mget key1 key2 ...... keyN 一次获取多个 key 的值,如果对应 key 不存在,则对应返回 nil。
mset key1 value1 ...... keyN valueN 一次设置多个 key 的值,成功返回 1 表示所有的值都设置了,失败返回 0 表示没有任何值被设置。
msetnx key1 value1 ...... keyN valueN 一次设置多个 key 的值,但是不会覆盖已经存在的 key
incr key 对 key 的值做++操作,并返回新的值。注意 incr 一个不是 int 的 value 会返回错误,incr 一个不存在的 key,则设置 key 值为 1。
decr key 对 key 的值做--操作,decr 一个不存在 key,则设置 key 值为-1。
incrby key integer 对 key 加上指定值 ,key 不存在时候会设置 key,并认为原来的 value是 0。
decrby key integer 对 key 减去指定值。decrby 完全是为了可读性,我们完全可以通过 incrby一个负值来实现同样效果,反之一样。
setrange key num newstr 替换字符串 num表示从第几位开始替换

hash类型

redis hash是一个键值对的集合,是一个string类型的field和value的映射表,适合用于存储对象。

hash-max-zipmap-entries 64 #配置字段最多 64 个
hash-max-zipmap-value 512 #配置 value 最大为 512 字节
hash  类型数据操作指令简介
hset key field value 设置 hash field 为指定值,如果 key 不存在,则创建
hget key field 获取指定的 hash field。
hmget key filed1....fieldN 获取全部指定的 hash filed。
hmset key filed1 value1 ...... filedN valueN 同时设置 hash 的多个 field。
hincrby key field integer 将指定的 hash filed 加上指定值。成功返回 hash filed 变更后的值。
hexists key field 检测指定 field 是否存在。
hdel key field 删除指定的 hash field。
hlen key 返回指定 hash 的 field 数量。
keys key 返回 hash 的所有 field。
hvals key 返回 hash 的所有 value。
hgetall 返回 hash 的所有 filed 和 valu

list类型

list 是一个链表结构,可以理解为一个每个子元素都是 string 类型的双向链表。主要功能是 push、pop、获取一个范围的所有值等。操作中 key 理解为链表的名字。

List 类型数据操作指令简介

lpush key string 在 key 对应 list 的头部添加字符串元素,返回 1 表示成功,0 表示 key 存在且不是 list 类型。栈的方式
rpush key string 在 key 对应 list 的尾部添加字符串元素。队列方式
llen key 返回 key 对应 list 的长度,如果 key 不存在返回 0,如果 key 对应类型不是 list返回错误。
lrange key start end 返回指定区间内的元素,下标从 0 开始,负值表示从后面计算,-1 表示倒数第一个元素 ,key 不存在返回空列表。
ltrim key start end 截取 list 指定区间内元素,成功返回 1,key 不存在返回错误。
lset key index value 设置 list 中指定下标的元素值,成功返回 1,key 或者下标不存在返回错误。
lrem key count value从 List 的头部(count正数)或尾部(count负数)删除一定数量(count )匹配 value 的元素,返回删除的元素数量。count 为 0 时候删除全部。
lpop key 从 list 的头部删除并返回删除元素。如果 key 对应 list 不存在或者是空返回 nil ,如果 key 对应值不是 list 返回错误。
rpop key 从 list 的尾部删除并返回删除元素。
blpop key1 ...... keyN timeout 从左到右扫描,返回对第一个非空 list 进行 lpop 操作并返回 ,
brpop 同 blpop,一个是从头部删除一个是从尾部删除。

set类型

set 是无序集合,最大可以包含(2 的 32 次方-1)个元素,不可重复。set 的是通过 hash table 实现的 ,所以添加,删除,查找的复杂度都是 O
(1)。

hash table 会随着添加或者删除自动的调整大小 。需要注意的是调整 hash table 大小时候需要同步(获取写锁)会阻塞其他读写操作。可能不久后就会改用跳表(skip list)来实现。跳表已经在 sorted sets 中使用了。

关于 set 集合类型除了基本的添加删除操作,其它有用的操作还包含集合的取并集(union),交集(intersection) ,差集(difference)。通过这些操作可以很容易的实现 SNS 中的好友推荐和 blog 的 tag 功能

set 类型数据操作指令简介

sadd key member 添加一个 string 元素到 key 对应 set 集合中,成功返回 1,如果元素以及在集合中则返回 0,key 对应的 set 不存在则返回错误。
srem key member 从 key 对应 set 中移除指定元素,成功返回 1,如果 member 在集合中不存在或者 key 不存在返回 0,如果 key 对应的不是 set 类型的值返回错误。
spop key 删除并返回 key 对应 set 中随机的一个元素,如果 set 是空或者 key 不存在返回nil。
srandmember key 同 spop,随机取 set 中的一个元素,但是不删除元素。
smove srckey dstkey member 从srckey对应set中移除member并添加到dstkey对应set中 ,整个操作是原子的。成功返回 1,如果 member 在 srckey 中不存在返回 0,如果 key 不是 set类型返回错误。
scard key 返回 set 的元素个数,如果 set 是空或者 key 不存在返回 0。
sismember key member 判断 member 是否在 set 中,存在返回 1,0 表示不存在或者 key 不存在。
sinter key1 key2 …… keyN 返回所有给定 key 的交集。
sinterstore dstkey key1 ....... keyN 返回所有给定 key 的交集,并保存交集存到 dstkey 下。
sunion key1 key2 ...... keyN 返回所有给定 key 的并集。
sunionstore dstkey key1 ...... keyN 返回所有给定 key 的并集,并保存并集到 dstkey 下。
sdiff key1 key2 ...... keyN 返回所有给定 key 的差集。
sdiffstore dstkey key1 ...... keyN 返回所有给定 key 的差集,并保存差集到 dstkey 下。
smembers key 返回 key 对应 set 的所有元素,结果是无序的

sorted set 类型

sorted t set 是有序集合,它在 set 的基础上增加了一个顺序属性,这一属性在添加修改元素的时候可以指定,每次指定后,会自动重新按新的值调整顺序。可以理解了有两列的mysql 表,一列存 value,一列存顺序。操作中 key 理解为 sorted set 的名字。

Sorted Set 类型数据操作指令简介

add key score member 添加元素到集合,元素在集合中存在则更新对应 score。
zrem key member 删除指定元素,1 表示成功,如果元素不存在返回 0。
zincrby key incr member 增加对应 member 的 score 值,然后移动元素并保持 skip list 保持有序。返回更新后的 score 值。
zrank key member 返回指定元素在集合中的排名(下标),集合中元素是按 score 从小到大排序的。
zrevrank key member 同上,但是集合中元素是按 score 从大到小排序。
zrange key start end 类似 lrange 操作从集合中去指定区间的元素。返回的是有序结果
zrevrange key start end 同上,返回结果是按 score 逆序的。
zrangebyscore key min max 返回集合中 score 在给定区间的元素。
zcount key min max 返回集合中 score 在给定区间的数量。
zcard key 返回集合中元素个数。
zscore key element 返回给定元素对应的 score。
zremrangebyrank key min max 删除集合中排名在给定区间的元素。
zremrangebyscore key min max 删除集合中 score 在给定区间的元素

Redis操作指令

keys *   查所有的key
exlire 设置某个key的过期时间,使用ttl查看剩余时间。
persist 取消过期时间
slect 选择数据库(数据库编号0-15)
move [key] [数据库下标] 将当前数据中的key转移到其他数据库中
randomkey 随机返回数据库里的一个key
rename 重命名key

DBSIZE 返回当前数据库 key 的数量。
INFO 返回当前 redis 服务器状态和一些统计信息。
MONITOR 实时监听并返回redis服务器接收到的所有请求信息 。
SHUTDOWN 把数据同步保存到磁盘上,并关闭redis服务。
CONFIG  GET parameter 获取一个 redis 配置参数信息。(个别参数可能无法获取)
CONFIG  SET parameter value 设置一个 redis 配置参数信息。(个别参数可能无法获取)
CONFIG  T RESETSTAT 重置 INFO 命令的统计信息。(重置包括:Keyspace 命中数、Keyspace 错误数、 处理命令数,接收连接数、过期 key 数)
DEBUG  OBJECT key 获取一个 key 的调试信息。
DEBUG  T SEGFAULT 制造一次服务器当机。
FLUSHDB 删除当前数据库中所有 key,此方法不会失败。小心慎用
FLUSHALL 删除全部数据库中所有 key,此方法不会失败。小心慎用

Http协议详解

发表于 2018-04-09 | 分类于 网络协议

HTTP简介

HTTP协议是Hyper Text Transfer Protocol(超文本传输协议)的缩写,是用于从万维网(WWW:World Wide Web )服务器传输超文本到本地浏览器的传送协议。
HTTP是一个基于TCP/IP通信协议来传递数据(HTML 文件, 图片文件, 查询结果等)。
HTTP是一个属于应用层的面向对象的协议,由于其简捷、快速的方式,适用于分布式超媒体信息系统。它于1990年提出,RFC 1945定义了HTTP/1.0版本。其中最著名的就是RFC 2616。RFC 2616定义了今天普遍使用的一个版本——HTTP 1.1。
HTTP是一个应用层协议,由请求和响应构成,是一个标准的客户端服务器模型。HTTP是一个无状态的协议。通常承载于TCP协议之上,有时也承载于TLS或SSL协议层之上,这个时候就是HTTPS。默认HTTP的端口号为80,HTTPS的端口号为443。
20200409120129

无状态:HTTP协议是无状态协议。无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理需要前面的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快。

无连接:无连接的含义是限制每次连接只处理一个请求。服务器处理完客户的请求,并收到客户的应答后,即断开连接。采用这种方式可以节省传输时间。

URI和URL的区别

URI,是uniform resource identifier,统一资源标识符,用来唯一的标识一个资源。Web上可用的每种资源如HTML文档、图像、视频片段、程序等都是一个来URI来定位的。

URL是uniform resource locator,统一资源定位器,它是一种具体的URI,即URL可以用来标识一个资源,而且还指明了如何locate这个资源。URL是Internet上用来描述信息资源的字符串,主要用在各种WWW客户程序和服务器程序上,特别是著名的Mosaic。

URN,uniform resource name,统一资源命名,是通过名字来标识资源,比如mailto:java-net@java.sun.com。

URI是以一种抽象的,高层次概念定义统一资源标识,而URL和URN则是具体的资源标识的方式。URL和URN都是一种URI。笼统地说,每个 URL 都是 URI,但不一定每个 URI 都是 URL。这是因为 URI 还包括一个子类,即统一资源名称 (URN),它命名资源但不指定如何定位资源。上面的 mailto、news 和 isbn URI 都是 URN 的示例。

HTTP请求Request

请求报文4部分组成:请求行(request line)、请求头部(header)、空行和请求数据四个部分组成。

20200410094356

请求行:Method Request-URI HTTP-Version 结尾符(结尾符一般用\r\n)

请求头:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Date:表示消息产生的日期和时间
Cache-Control:用于指定缓存指令,缓存指令是单向的(响应中出现的缓存指令在请求中未必会出现),且是独立的(一个消息的缓存指令不会影响另一个消息处理的缓存机制)

Host:初始URL中的主机和端口
User-Agent:发送请求的浏览器类型、操作系统等信息
Accept:客户端可识别的内容类型列表,用于指定客户端接收那些类型的信息
Accept-Encoding:客户端可识别的数据编码
Accept-Language:表示浏览器所支持的语言类型
Authorization:授权信息,通常出现在对服务器发送的WWW-Authenticate头的应答中
Connection:允许客户端和服务器指定与请求/响应连接有关的选项,例如这是为Keep-Alive则表示保持连接。
Transfer-Encoding:告知接收端为了保证报文的可靠传输,对报文采用了什么编码方式。
Content-Length:表示请求消息正文的长度
Cookie:这是最重要的请求头信息之一

From:请求发送者的email地址,由一些特殊的Web客户程序使用,浏览器不会用到它

If-Modified-Since:只有当所请求的内容在指定的日期之后又经过修改才返回它,否则返回304“Not Modified”应答
Pragma:指定“no-cache”值表示服务器必须返回一个刷新后的文档,即使它是代理服务器而且已经有了页面的本地拷贝

Referer:包含一个URL,用户从该URL代表的页面出发访问当前请求的页面
User-Agent:浏览器类型,如果Servlet返回的内容与浏览器类型有关则该值非常有用

Content-Type:发送给接收者的实体正文的媒体类型
Content-Lenght:实体正文的长度
Content-Language:描述资源所用的自然语言,没有设置则该选项则认为实体内容将提供给所有的语言阅读
Content-Encoding:实体报头被用作媒体类型的修饰符,它的值指示了已经被应用到实体正文的附加内容的编码,因而要获得Content-Type报头域中所引用的媒体类型,必须采用相应的解码机制。
Last-Modified:实体报头用于指示资源的最后修改日期和时间
Expires:实体报头给出响应过期的日期和时间

HTTP请求Response

HTTP响应也由四个部分组成,分别是:状态行、消息报头、空行和响应正文。

20200410094437

状态行:HTTP协议版本号, 状态码, 状态消息 三部分组成。((HTTP/1.1)表明HTTP版本为1.1版本,状态码为200,状态消息为(ok))

响应报头

1
2
3
4
5
6
7
8
9
Allow:服务器支持哪些请求方法(如GET、POST等)
Content-Encoding:文档的编码(Encode)方法
Location:用于重定向接受者到一个新的位置,常用在更换域名的时候
Server:包含可服务器用来处理请求的系统信息,与User-Agent请求报头是相对应的
Date:表示消息产生的日期和时间
Content-Type:指定了MIME类型的HTML(text/html),编码类型是UTF-8
Content-Length:表示内容长度
Expires:指明应该在什么时候认为文档已经过期,从而不再缓存它
Refresh:表示浏览器应该在多少时间之后刷新文档,以秒计

状态码

请求成功

1
2
3
4
200 OK : 请求执行成功并返回相应数据,如 GET 成功
201 Created : 对象创建成功并返回相应资源数据,如 POST 成功;创建完成后响应头中应该携带头标 Location ,指向新建资源的地址
202 Accepted : 接受请求,但无法立即完成创建行为,比如其中涉及到一个需要花费若干小时才能完成的任务。返回的实体中应该包含当前状态的信息,以及指向处理状态监视器或状态预测的指针,以便客户端能够获取最新状态。
204 No Content : 请求执行成功,不返回相应资源数据,如 PATCH , DELETE 成功

重定向

重定向的新地址都需要在响应头 Location 中返回

1
2
3
4
301 Moved Permanently : 被请求的资源已永久移动到新位置
302 Found : 请求的资源现在临时从不同的 URI 响应请求
303 See Other : 对应当前请求的响应可以在另一个 URI 上被找到,客户端应该使用 GET 方法进行请求
307 Temporary Redirect : 对应当前请求的响应可以在另一个 URI 上被找到,客户端应该保持原有的请求方法进行请求

条件请求

1
2
3
304 Not Modified : 资源自从上次请求后没有再次发生变化,主要使用场景在于实现数据缓存
409 Conflict : 请求操作和资源的当前状态存在冲突。主要使用场景在于实现并发控制
412 Precondition Failed : 服务器在验证在请求的头字段中给出先决条件时,没能满足其中的一个或多个。主要使用场景在于实现并发控制

客户端错误

1
2
3
4
5
6
7
8
9
10
11
400 Bad Request : 请求体包含语法错误
401 Unauthorized : 需要验证用户身份,如果服务器就算是身份验证后也不允许客户访问资源,应该响应 403 Forbidden
403 Forbidden : 服务器拒绝执行
404 Not Found : 找不到目标资源
405 Method Not Allowed : 不允许执行目标方法,响应中应该带有 Allow 头,内容为对该资源有效的 HTTP 方法
406 Not Acceptable : 服务器不支持客户端请求的内容格式,但响应里会包含服务端能够给出的格式的数据,并在 Content-Type 中声明格式名称
410 Gone : 被请求的资源已被删除,只有在确定了这种情况是永久性的时候才可以使用,否则建议使用 404 Not Found
413 Payload Too Large : POST 或者 PUT 请求的消息实体过大
415 Unsupported Media Type : 服务器不支持请求中提交的数据的格式
422 Unprocessable Entity : 请求格式正确,但是由于含有语义错误,无法响应
428 Precondition Required : 要求先决条件,如果想要请求能成功必须满足一些预设的条件

服务端错误

1
2
3
4
500 Internal Server Error : 服务器遇到了一个未曾预料的状况,导致了它无法完成对请求的处理。
501 Not Implemented : 服务器不支持当前请求所需要的某个功能。
502 Bad Gateway : 作为网关或者代理工作的服务器尝试执行请求时,从上游服务器接收到无效的响应。
503 Service Unavailable : 由于临时的服务器维护或者过载,服务器当前无法处理请求。这个状况是临时的,并且将在一段时间以后恢复。如果能够预计延迟时间,那么响应中可以包含一个Retry-After 头用以标明这个延迟时间(内容可以为数字,单位为秒;或者是一个 HTTP 协议指定的时间格式)。如果没有给出这个 Retry-After 信息,那么客户端应当以处理 500 响应的方式处理它。

Cookie和Session

Cookie和Session都为了用来保存状态信息,都是保存客户端状态的机制,它们都是为了解决HTTP无状态的问题而所做的努力。

Session可以用Cookie来实现,也可以用URL回写的机制来实现。用Cookie来实现的Session可以认为是对Cookie更高级的应用。

两者的比较

Cookie和Session有以下明显的不同点:

  • Cookie将状态保存在客户端,Session将状态保存在服务器端;
  • Cookies是服务器在本地机器上存储的小段文本并随每一个请求发送至同一个服务器。
  • Session是针对每一个用户的,变量的值保存在服务器上,用一个sessionID来区分是哪个用户session变量,这个值是通过用户的浏览器在访问的时候返回给服务器,当客户禁用cookie时,这个值也可能设置为由get来返回给服务器;
  • 就安全性来说:当你访问一个使用session 的站点,同时在自己机子上建立一个cookie,建议在服务器端的SESSION机制更安全些.因为它不会任意读取客户存储的信息。

Session机制

服务器使用一种类似于散列表的结构来保存信息。

当程序需要为某个客户端的请求创建一个session的时候,服务器首先检查这个客户端的请求里是否已包含了一个session标识session id,如果已包含一个session id则说明以前已经为此客户端创建过session,服务器就按照session id把这个 session检索出来使用(如果检索不到,可能会新建一个),如果客户端请求不包含session id,则为此客户端创建一个session并且生成一个与此session相关联的session id,session id的值应该是一个既不会重复,又不容易被找到规律以仿造的字符串,这个 session id将被在本次响应中返回给客户端保存。

Session的实现方式

使用Cookie来实现:服务器给每个Session分配一个唯一的JSESSIONID,并通过Cookie发送给客户端。
URL回显:URL回写是指服务器在发送给浏览器页面的所有链接中都携带JSESSIONID的参数,这样客户端点击任何一个链接都会把JSESSIONID带会服务器。

Cookie的流程

服务器在响应消息中用Set-Cookie头将Cookie的内容回送给客户端,客户端在新的请求中将相同的内容携带在Cookie头中发送给服务器。从而实现会话的保持。

Web缓存

缓存会根据请求保存输出内容的副本,例如html页面,图片,文件,当下一个请求来到的时候:如果是相同的URL,缓存直接使用副本响应访问请求,而不是向源服务器再次发送请求。

与缓存相关的HTTP扩展消息头

1
2
3
4
5
6
7
Expires:指示响应内容过期的时间,格林威治时间GMT
Cache-Control:更细致的控制缓存的内容
Last-Modified:响应中资源最后一次修改的时间
ETag:响应中资源的校验值,在服务器上某个时段是唯一标识的。
Date:服务器的时间
If-Modified-Since:客户端存取的该资源最后一次修改的时间,同Last-Modified。
If-None-Match:客户端存取的该资源的检验值,同ETag。

缓存生效流程

服务器收到请求时,会在200OK中回送该资源的Last-Modified和ETag头,客户端将该资源保存在cache中,并记录这两个属性。当客户端需要发送相同的请求时,会在请求中携带If-Modified-Since和If-None-Match两个头。两个头的值分别是响应中Last-Modified和ETag头的值。服务器通过这两个头判断本地资源未发生变化,客户端不需要重新下载,返回304响应。

HTTP定义了3种缓存机制

Freshness:允许一个回应消息可以在源服务器不被重新检查,并且可以由服务器和客户端来控制。例如,Expires回应头给了一个文档不可用的时间。Cache-Control中的max-age标识指明了缓存的最长时间;

Validation:用来检查以一个缓存的回应是否仍然可用。例如,如果一个回应有一个Last-Modified回应头,缓存能够使用If-Modified-Since来判断是否已改变,以便判断根据情况发送请求;

Invalidation: 在另一个请求通过缓存的时候,常常有一个副作用。例如,如果一个URL关联到一个缓存回应,但是其后跟着POST、PUT和DELETE的请求的话,缓存就会过期。

https

HTTPS(全称:Hypertext Transfer Protocol over Secure Socket Layer),是以安全为目标的HTTP通道,简单讲是HTTP的安全版。即HTTP下加入SSL层,HTTPS的安全基础是SSL。

有两种基本的加解密算法类型:

  • 对称加密:密钥只有一个,加密解密为同一个密码,且加解密速度快,典型的对称加密算法有DES、AES等;
  • 非对称加密:密钥成对出现(且根据公钥无法推知私钥,根据私钥也无法推知公钥),加密解密使用不同密钥(公钥加密需要私钥解密,私钥加密需要公钥解密),相对对称加密速度较慢,典型的非对称加密算法有RSA、DSA等。

    https的通信过程

    20200410094322

Redis介绍和下载安装配置

发表于 2018-04-08 | 分类于 Redis

简介

redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存,事件发布或订阅,高速队列等场景。该数据库使用ANSI C语言编写,支持网络,提供字符串,哈希,列表,队列,集合结构直接存取,基于内存,可持久化。

redis的应用场景有:
1,会话缓存(最常用)
2,消息队列,比如支付3,活动排行榜或计数
4,发布,订阅消息(消息通知)
5,商品列表,评论列表等

redis数据类型:
Redis一共支持五种数据类:string(字符串),hash(哈希),list(列表),set(集合)和zset(sorted set有序集合)。

下载安装

在CentOS搭建Redis环境,下载命令:http://download.redis.io/releases/redis-4.0.12.tar.gz下载到/usr/local/tmp

解压:

1
2
3
[root@localhost ~]# cd /usr/local/
[root@localhost local]# tar -zxvf redis-4.0.12.tar.gz
[root@localhost local]# cd redis-4.0.12

编译安装:

1
2
3
4
5
6
7
8
9
10
[root@localhost redis-4.0.12]# make
make[1]: Leaving directory `/usr/local/redis-4.0.12/src'
[root@localhost redis-4.0.12]# cd src
[root@localhost src]# make install
INSTALL install
INSTALL install
INSTALL install
INSTALL install
INSTALL install
安装成功!

make install PREFIX=/usr/local/redis 安装到指定位置

可执行文件解释:

#redis-server:Redis 服务器的启动程序
#redis-cli:Redis 命令行操作客户端。
#redis-benchmark:Redis 性能测试工具,测试 Redis 在你的系统及你的配置下的读写性能。
    比如:redis-benchmark -n 100000 -c 50 #模拟同时由 50 个客户端发送 100000 个 SETs/GETs 查询
#redis-check-aof:更新日志检查
#redis-check-dump:本地数据库检查

配置

为了方便管理,将Redis文件中的conf配置文件和常用命令移动到统一文件中

创建目录:

1
2
3
4
    [root@localhost src]# mkdir -p /usr/local/redis/etc
[root@localhost src]# mkdir -p /usr/local/redis/bin
```
复制安装文件的配置文件:

[root@localhost src]# mv mkreleasehdr.sh redis-benchmark redis-check-aof redis-check-rdb redis-cli redis-sentinel redis-server redis-trib.rb /usr/local/redis/bin/
[root@localhost src]# cp ../redis.conf /usr/local/redis/etc/

1
2
3
4
5
6
7
8
## 执行redis-server启动服务

直接执行Redis-server 启动的Redis服务,是在前台直接运行的,如果Lunix关闭当前会话,则Redis服务也随即关闭。
正常情况下,启动Redis服务需要从后台启动,并且指定启动配置文件。
Redis配置文:将daemonize 配置项改为yes
daemonize yes 表明在后台运行

启动并指定配置文件:

[root@localhost bin]# ./redis-server ../etc/redis.conf
6531:C 19 Feb 14:32:11.004 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
6531:C 19 Feb 14:32:11.004 # Redis version=4.0.12, bits=64, commit=00000000, modified=0, pid=6531, just started
6531:C 19 Feb 14:32:11.004 # Configuration loaded

1
查看服务:

[root@localhost bin]# ps -ef | grep redis
root 6532 1 0 14:32 ? 00:00:00 ./redis-server 127.0.0.1:6379
root 6549 2316 0 14:32 pts/0 00:00:00 grep redis

1
进入客户端:

[root@localhost bin]# ./redis-cli
127.0.0.1:6379>

1
退出客户端:

127.0.0.1:6379> quit

1
2
3
4
5
6
退出服务:

[root@localhost bin]# redis-cli shutdown
或者pkill redis-server 或kill 进程号

## 配置文件基本说明

daemonize: #是否以后台守护进程方式运行
pidfile: #pid 文件位置
port: #监听的端口号
timeout: #请求超时时间
loglevel: #log 信息级别,总共支持四个级别:debug、verbose、notice、warning ,默认为 verbose
logfile: #默认为标准输出(stdout),如果配置为守护进程方式运行,而这里又配置为日志记录方式为标准输出,则日志将会发送给/dev/null
databases: #开启数据库的数量。使用”SELECT 库 ID”方式切换操作各个数据库
save * : #保存快照的频率,第一个表示多长时间,第二个*表示执行多少次写操作。在一定时间内执行一定数量的写操作时,自动保存快照。可设置多个条件。
rdbcompression:#保存快照是否使用压缩
dbfilename: #数据快照文件名(只是文件名,不包括目录)。默认值为 dump.rdb
dir: #数据快照的保存目录(这个是目录)
requirepass: #设置 Redis 连接密码,如果配置了连接密码,客户端在连接 Redis 时需要通过 AUTH 命令提供密码,默认关闭。


redis4安装

    1.下载redis
    wget http://download.redis.io/releases/redis-4.0.10.tar.gz
    2.解压安装包
    tar -zxf redis-4.0.10.tar.gz -C /usr/local/work/
    3.编译安装
    ln -s /usr/local/work/redis-4.0.10/ /usr/local/work/redis
    cd /usr/local/work/redis
    make & make install
    4.初始化redis
    ./utils/install_server.sh

    #一直Enter即为默认配置
    #设置redis的端口,启动的配置文件位置,日志位置以及可执行脚本的位置
    5.取消安全模式和本地访问绑定
    sed -i 's/slave-read-only yes/slave-read-only no/g' /etc/redis/6379.conf
    sed -i 's/bind 127.0.0.1/#bind 127.0.0.1/g' /etc/redis/6379.conf
    6.启动服务
    systemctl start redis_6379
    systemctl enable redis_6379

    Docker上安装redis

   docker pull redis:latest

    docker run -itd --name redis-test -p 6379:6379 redis

Mybatis缓存和mapper配置

发表于 2018-04-05 | 分类于 mybatis

mapper配置

标签介绍

insert,update,delete,select,sql,resultMap

sql:可被其它语句引用的可重用语句块;resultMap:确定实体类属性与表中字段对应关系;

namespace的作用

在MyBatis中,Mapper中的namespace用于绑定Dao接口的,即面向接口编程。

它的好处在于当使用了namespace之后就可以不用写接口实现类,业务逻辑会直接通过这个绑定寻找到相对应的SQL语句进行对应的数据处理

parametetType属性

在insert,update,select,delete标签中,可以通过parameterType指定输入参数的类型,类型可以是简单类型、map、pojo的包装类型。

parameterType属性是可以省略的.MyBatis框架可以根据SqlSession接口中方法的参数来判断输入参数的实际数据类型.

#{}和${}区别

在MyBatis中提供了两种方式读取参数的内容到SQL语句中,分别是

1
2
#{参数名} :实体类对象或则Map集合读取内容,采用预编译方式,可以防止SQL注入 
${参数名} :实体类对象或则Map集合读取内容,采用直接赋值方式,无法阻止SQL注入攻击

在大多数情况下,我们都是采用#{}读取参数内容,但是在一些特殊的情况下:表名、选取的列是动态的,order by和in操作,可以考虑使用${}。

resultType属性

  • resultType属性存在select标签.负责将查询结果进行映射.
  • resultType属性可以指定一个基本类型也可以是一个实体类类型
  • 使用resultType属性为实体类类型时,只有查询出来的列名和实体类中的属性名一致,才可以映射成功. 如果查询出来的列名和pojo中的属性名全部不一致,就不会创建实体类对象.但是只要查询出来的列名和实体类中的属性有一个一致,就会创建实体类对象
  • resultType属性无法与resultMap属性同时出现.

注解方式配置

注解方式就是将SQL语句直接写在接口上,对于需求比较简单的系统,效率较高。缺点在于,每次修改sql语句都要编译代码,对于复杂的sql语句可编辑性和可读性都差,一般不建议使用这种配置方式。

1
2
3
4
5
6
@Select
@Results
@ResultMap
@Insert
@Update
@Delete

typeAliases 别名

全局配置mybatis.xml 文件中

MyBatis事务

在 mybatis 中默认是关闭了 JDBC 的自动提交功能setAutoCommit(false)。

每一个 SqlSession 默认都是不自动提交事务,需要使用session.commit()提交事务。或者使用openSession(true);全局设置自动提交,相当于setAutoCommittrue);

在 openSession()时 Mybatis 会创建 SqlSession 时同时创建一个Transaction(事务对象),同时 autoCommit 都为 false。如果出现异常,应该 session.rollback()回滚事务。

mybatis 中 标签没有 resultType 属性,默认了返回值都是 int

1
2
3
<insert id="insertUser" parameterType="User">
insert into user values(default,#{name},#{age})
</insert>

Java:

1
2
3
4
5
6
7
8
9
//插入数据
try {
int i=session.insert("com.hu.mapper.UserMapper.insertUser", new User("张无忌1", 28));
System.out.println(i>0?"插入成功":"插入失败");
} catch (Exception e) {
session.rollback();
e.printStackTrace();
}
session.commit();

动态SQL

根据不同的条件需要执行不同的 SQL 命令.称为动态 SQL,MyBatis 中动态 SQL 在 mapper.xml 中添加逻辑判断。

<if>

例子:

1
2
3
4
5
6
7
8
9
10
<select id="getAllUser" resultType="user">
select * from user where 1=1
<!-- OGNL 表达式,直接写 key 或对象的属性 不需要添加任 何特字符号 -->
<if test="id!=null and id!=''">
and id=#{id}
</if>
<if test="name!=null and name!=''">
and name=#{name}
</if>
</select>

<where>

此时where不好处理,还要加一个1=1,因此引入where标签。当编写 where 标签时,如果内容中第一个是 and 去掉第 一个and, 如果<where>中有内容会生成 where 关键字,如果没有内容不生成 where 关键字。

1
2
3
4
5
6
7
8
9
10
11
<select id="getAllUser" resultType="user">
select * from user
<where>
<if test="id!=null and id!=''">
and id=#{id}
</if>
<if test="name!=null and name!=''">
and name=#{name}
</if>
</where>
</select>

<choose><when> <otherwise>

有时我们不想应用所有的条件, 相反我们想选择很多情况下的一种。 Java 中的 switch 和语句相似,MyBatis 提供 choose 元素
只要有一个成立其他都不成立。

1
2
3
4
5
6
7
8
9
10
<select id="getAllUser" resultType="user">
select * from user
<where>
<choose>
<when test="id!=null and id!=''">and id=#{id}</when>
<when test="name!=null and name!=''">and name=#{name}</when>
<otherwise>and 1=1</otherwise><!-- 其他情况 -->
</choose>
</where>
</select>

<set>

用在修改 SQL 中 set 从句。用来去掉最后一个逗号。如果<set>里面有内容生成 set 关键字,没有就不生成。
例:

1
2
3
4
5
6
7
8
9
10
<update id="updateUserByid">
update user
<set>
id=#{id},<!-- 目的防止<set>中没有内容 mybatis 不生成 set关键字 如果修改中没有 set 从句 SQL 语法错误 -->
<if test="name!=null and name!=''">
name=#{name},
</if>
</set>
where id=#{id}
</update>

接口:

1
2
3
4
5
6
7
8
9
10
11
public int updateUserByid(@Param("id")Integer id, @Param("name")String name);
测试:
//把id为1的记录修改name为李连杰
try {
int index = userMapper.updateUserByid(1, "李连杰");
System.out.println(index);
} catch (Exception e) {
session.rollback();
e.printStackTrace();
}
session.commit();

结果:

<Trim>

标签属性:

1
2
3
4
5
prefix 在前面添加内容
prefixOverrides 去掉前面内容
suffix 在后面添加内容
suffixOverrieds 去掉后面内容
执行顺序去掉内容后添加内容
1
2
3
4
5
6
7
<update id="upd" parameterType="User">
update user
<trim prefix="set" suffixOverrides=",">
name=#{name}
</trim>
where id=#{id}
</update>

得到sql语句就是:update user set name=? where id=?

<bind>

给参数重新赋值,比如在原内容前后添加内容,如:模糊查询。

1
2
3
4
<select id="selByUserNameLike" resultType="user">
<bind name="name" value="'%'+name+'%'"/>
select * from user where name like #{name}
</select>
1
2
3
4
public List<User> selByUserNameLike(@Param("name")String name);

List<User> users = userMapper.selByUserNameLike("张");
System.out.println(users.size());

<foreach>

循环参数内容,还具备在内容的前后添加内容,还具备添加分隔符功能。 适用场景:in 查询中,批量新增中(mybatis 中foreach 效率比较低)

1
2
3
4
5
6
<select id="selIn" parameterType="list" resultType="user">
select * from user where name in
<foreach collection="list" item="name" open="(" close=")" separator=",">
#{name}
</foreach>
</select>

接口:
public List selIn(@Param(“list”)List<?> list);

测试:

1
2
3
4
5
List<String> list = new ArrayList<>();
list.add("张三丰");
list.add("李连杰");
List<User> users = userMapper.selIn(list);
System.out.println(users.size());

结果:

插入集合数据

1
2
3
4
5
6
<insert id="insertList" parameterType="list">
insert into user values
<foreach collection="list" item="name" separator=",">
(default,#{name },30)
</foreach>
</insert>

接口:
public int insertList(@Param(“list”)List<?> list);

测试:

1
2
3
4
5
6
7
List<String> list = new ArrayList<>();
for (int i = 0; i < 20; i++) {
list.add("古龙"+i);
}
int index = userMapper.insertList(list);
System.out.println(index);
session.commit();

结果:

如果是批量插入数据,应该这样处理。

创建session时,openSession()必须指定参数。 底层 JDBC 的 PreparedStatement.addBatch();

factory.openSession(ExecutorType.BATCH)

<sql> <include>

某些 SQL 片段如果希望复用,可以使用<sql>定义这个片段,在<select>或<delete>或<update>或<insert>中使用<include>

Mybatis 注解

Mybatis 的注解简化 mapper.xml 文件,如果涉及动态 SQL 依然使用 mapper.xml,mapper.xml 和注解可以共存,使用注解时 mybatis.xml 中<mappers>使用

1
2
3
<package/>
<mapper class=""/>
<mapper class="com.hu.mapper.StudentTeacher"/>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public interface StudentTeacher {
/**
* 实现查询
* @return
*/
@Select("select * from student")
List<Student> selectAll();
/**
* 实现插入
* @param student
* @return
*/
@Insert("insert into student values(default,#{name},#{age},2)")
int insertStudent(Student student);
/**
* 实现修改
* @param student
* @return
*/
@Update("update student set name=#{name} where id=#{id}")
int updateStudent(Student student);
/**
* 实现删除操作
* @param id
* @return
*/
@Delete("delete from student where id=#{0}")
int deleteById(int id);
}

SqlSession session = MyBatisUtil.getSession();
StudentTeacher st = session.getMapper(StudentTeacher.class);
//查询
List<Student> students = st.selectAll();
System.out.println(students.size());
//增加
Student student = new Student();
student.setAge(44);
student.setName("小明");
int index = st.insertStudent(student);
System.out.println(index);
//..................

使用注解实现resultMap:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 使用注解实现resultMap
* @param id
* @return
*/
@Results(value= {
@Result(id=true,property="id",column="id"),
@Result(property="name",column="name"),
@Result(property="students",column="id",
many=@Many(select="com.hu.mapper.StudentMapper.selectByTid"))
})
@Select("select * from teacher where id=#{0}")
Teacher selTeacherById (int id);
//----------------------
public static void main(String[] args) {
SqlSession session = MyBatisUtil.getSession();
StudentTeacher st = session.getMapper(StudentTeacher.class);
//查询
Teacher teacher = st.selTeacherById(1);
System.out.println(teacher.toString());

session.commit();
MyBatisUtil.closeSession();
}
1
2
3
4
5
@Results() 相当于<resultMap>
@Result() 相当于<id/>或<result/>
@Result(id=true) 相当与<id/>
@Many() 相当于<collection/>
@One() 相当于<association/>

缓存

一级缓存

基于PerpetualCache 的 HashMap本地缓存,其存储作用域为 SqlSession ,当 Session flush 或 close 之后,该Session中的所有 ache 就将清空。缓存的是statement,刷新缓存是清空这个 SqlSession 的所有缓存, 不单单是某个键。任何的 UPDATE, INSERT, DELETE 语句都会清空缓存。

MyBatis 默认开启了一级缓存,一级缓存是在SqlSession 层面进行缓存的。同一个session多次调用同一个Mapper和同一个方法的同一个参数,只会进行一次数据库查询,然后把数据缓存,以后直接先从缓存中取出数据,不会直接去查数据库。

1
2
3
4
5
6
7
8
public static void main(String[] args) throws IOException {
SqlSession session = MyBatisUtil.getSession();

User user = session.selectOne("com.hu.mapper.UserMapper.selectById",2);
System.out.println(user);
User user1 = session.selectOne("com.hu.mapper.UserMapper.selectById",2);
System.out.println(user1);
}

同一个session,结果只查一次数据库。结果如下:
20200407084528

二级缓存

二级缓存又叫SQLSessionFactory缓存,二级缓存存在于 SqlSessionFactory 生命周期中。二级缓存与一级缓存其机制相同,默认也是采用 PerpetualCache,HashMap存储,不同在于其存储作用域为 Mapper(Namespace),并且可自定义存储源Ehcache。
对于缓存数据更新机制,当某一个作用域(一级缓存Session/二级缓存Namespaces)的进行了 C/U/D 操作后,默认该作用域下所有 select 中的缓存将被clear。当 SqlSession 对象 close()时或 commit()时会把 SqlSession 缓存的数据刷(flush)到 SqlSessionFactory 缓存区中。

有效范围:同一个 factory 内所有SqlSession 都可以获取。二级缓存在数据频繁被使用,很少被修改时使用性能最高。

使用二级缓存步骤:在 mapper.xml 中添加<cache readOnly=”true”></cache> 如果不写 readOnly=”true”需要把实体类序列化,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws IOException {

SqlSession session = MyBatisUtil.getSession();

User user = session.selectOne("com.hu.mapper.UserMapper.selectById",2);
System.out.println(user);
session.commit();
//另外一个session
SqlSession session1 = MyBatisUtil.getSession();
User user1 = session1.selectOne("com.hu.mapper.UserMapper.selectById",2);
System.out.println(user1);
session.commit();
}

20200407084850

说明:可以在开启二级缓存时候,手动配置一些属性

1
<cache eviction="LRU" flushInterval="100000" size="1024" readOnly="true"/>

各个属性意义如下:

1
2
3
4
5
6
7
8
eviction:缓存回收策略
- LRU:最少使用原则,移除最长时间不使用的对象
- FIFO:先进先出原则,按照对象进入缓存顺序进行回收
- SOFT:软引用,移除基于垃圾回收器状态和软引用规则的对象
- WEAK:弱引用,更积极的移除移除基于垃圾回收器状态和弱引用规则的对象
flushInterval:刷新时间间隔,单位为毫秒,这里配置的100毫秒。如果不配置,那么只有在进行数据库修改操作才会被动刷新缓存区
size:引用额数目,代表缓存最多可以存储的对象个数
readOnly:是否只读,如果为true,则所有相同的sql语句返回的是同一个对象(有助于提高性能,但并发操作同一条数据时,可能不安全),如果设置为false,则相同的sql,后面访问的是cache的clone副本。

可以在Mapper的具体方法下设置对二级缓存的访问:useCache配置, 如果一条语句每次都需要最新的数据,就意味着每次都需要从数据库中查询数据,可以把这个属性设置为false,如:

1
<select id="selectAll" resultMap="BaseResultMap" useCache="false">

刷新缓存(清空缓存)

二级缓存默认会在insert、update、delete操作后刷新缓存,可以手动配置不更新缓存,如下:

1
<update id="updateById" parameterType="User" flushCache="false" />

如果不做任何处理,都为默认配置,则结果如下:  
* 1. 映射语句文件中的所有select语句将会被缓存。
  * 2. 映射语句文件中的所有insert,update和delete语句会刷新缓存。
  * 3. 缓存会使用Least Recently Used(LRU,最近最少使用的)算法来收回。
  * 4. 缓存会根据指定的时间间隔来刷新。
  * 5. 缓存会存储1024个对象

自定义缓存

自定义缓存对象,该对象必须实现 org.apache.ibatis.cache.Cache 接口,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class CacheTest implements Cache {
private ReadWriteLock lock = new ReentrantReadWriteLock();
private ConcurrentHashMap<Object, Object> cache = new ConcurrentHashMap<Object, Object>();
private String id;

public CacheTest() {
System.out.println("初始化1");
}

// 必须有该构造函数
public CacheTest(String id) {
System.out.println("初始化2");
this.id = id;
}

// 获取缓存编号
public String getId() {
System.out.println("得到ID:" + id);
return id;
}

// 获取缓存对象的大小
public int getSize() {
System.out.println("获取缓存大小!");
return 0;
}

// 保存key值缓存对象
public void putObject(Object key, Object value) {
System.out.println("往缓存中添加元素:key=" + key + ",value=" + value);
cache.put(key, value);
}

// 通过KEY
public Object getObject(Object key) {
System.out.println("通过Key获取值的Key:" + key);
System.out.println("通过Key获取的值为:"+cache.get(key));
return cache.get(key);
}

// 通过key删除缓存对象
public Object removeObject(Object key) {
System.out.println("移除缓存对象:" + key);
return null;
}

// 清空缓存
public void clear() {
System.out.println("清除缓存!");
cache.clear();
}

// 获取缓存的读写锁
public ReadWriteLock getReadWriteLock() {
System.out.println("获取锁对象!!!");
return lock;
}
}

配置:

1
<cache readOnly="true" type="com.hu.test.CacheTest"></cache>

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {

SqlSession session = MyBatisUtil.getSession();

User user = session.selectOne("com.hu.mapper.UserMapper.selectById",2);
System.out.println(user);
System.out.println("------------------------------------------------------------------------");
session.commit();
//另外一个session
SqlSession session1 = MyBatisUtil.getSession();
User user1 = session1.selectOne("com.hu.mapper.UserMapper.selectById",2);
System.out.println(user1);
session1.commit();
}

20200407092946

每次查询数据库前,MyBatis都会先在缓存中查找是否有该缓存对象。只有当调用了commit() 方法,MyBatis才会往缓存中写入数据,数据记录的键为 数字编号+Mapper名+方法名+SQL语句+参数 格式,值为返回的对象值。

Redis集群搭建(节点添加删除)

发表于 2018-04-05 | 分类于 Redis

一、Redis Cluster(Redis集群)简介

1、redis是一个开源的key value存储系统,受到了广大互联网公司的青睐。redis3.0版本之前只支持单例模式,在3.0版本及以后才支持集群,我这里用的是redis3.0.0版本;
2、所有的redis节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽。redis集群采用P2P模式,是完全去中心化的,不存在中心节点或者代理节点;
3、redis集群是没有统一的入口的,客户端(client)连接集群的时候连接集群中的任意节点(node)即可,集群内部的节点是相互通信的(PING-PONG机制),每个节点都是一个redis实例;
4、为了实现集群的高可用,即判断节点是否健康(能否正常使用),redis-cluster有这么一个投票容错机制:如果集群中超过半数的节点投票认为某个节点挂了,那么这个节点就挂了(fail)。这是判断节点是否挂了的方法;
5、那么如何判断集群是否挂了呢? -> 如果集群中任意一个节点挂了,而且该节点没有从节点(备份节点),那么这个集群就挂了。这是判断集群是否挂了的方法;

那么为什么任意一个节点挂了(没有从节点)这个集群就挂了呢? -> 因为集群内置了16384个slot(哈希槽),并且把所有的物理节点映射到了这16384[0-16383]个slot上,或者说把这些slot均等的分配给了各个节点。当需要在Redis集群存放一个数据(key-value)时,redis会先对这个key进行crc16算法,然后得到一个结果。再把这个结果对16384进行求余,这个余数会对应[0-16383]其中一个槽,进而决定key-value存储到哪个节点中。所以一旦某个节点挂了,该节点对应的slot就无法使用,那么就会导致集群无法正常工作。

如果集群超过半数以上master挂掉,无论是否有slave,集群进入fail状态.

综上所述,每个Redis集群理论上最多可以有16384个节点。

关于redis的集群化方案 目前有三种:

(1)Twitter开发的twemproxy
    简介:twemproxy架构简单就是用proxy对后端redis server进行代理 但是由于代理层的消耗性能很低 而且通常涉及多个key的操作都是不支持的 而且本身不支持动态扩容和透明的数据迁移 而且也失去维护 Twitter内部已经不使用了
(2)豌豆荚开发的codis
        codis使用的也是proxy思路,但是做的比较好,是这两种之间的一个中间级,而且支持redis命令是最多的,有图形化GUI管理和监控工具。
(3)redis官方的redis-cluster
        redis-cluster是三个里性能最强大的。因为他使用去中心化的思想,使用hash slot方式,将16348个hash slot 覆盖到所有节点上,对于存储的每个key值 使用CRC16(KEY)&16348=slot 得到他对应的hash slot 并在访问key时就去找他的hash slot在哪一个节点上 然后由当前访问节点从实际被分配了这个hash slot的节点去取数据 节点之间使用轻量协议通信,减少带宽占用,性能很高,自动实现负载均衡与高可用,自动实现failover。并且支持动态扩展,官方已经玩到可以1000个节点 实现的复杂度低,是全新的思路。
        但是它也有一些不足。例如官方没有提供图形化管理工具,运维体验差,全手工数据迁移 ,且自己对自己本身的redis命令支持也不完全等

二、集群搭建需要的环境

Redis集群至少需要3个节点,因为投票容错机制要求超过半数节点认为某个节点挂了该节点才是挂了,所以2个节点无法构成集群。
要保证集群的高可用,需要每个节点都有从节点,也就是备份节点,所以Redis集群至少需要6台服务器。

1、安装ruby

创建集群集群脚本是ruby写的

yum -y install ruby
yum -y install rubygems
gem install redis  #安装Redis和ruby的接口

[root@localhost redis]# gem install redis
ERROR:  Error installing redis:
    redis requires Ruby version >= 2.2.2.

yum库中ruby的版本支持到 2.0.0,可gem 安装redis需要最低是2.2.2。解决办法是采用rvm来更新ruby
安装yum install curl

按照rvm.io提示执行gpg2 –recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
curl: (35) SSL connectgpg2 –recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
gpg: requesting key D39DC0E3 from hkp server keys.gnupg.net
gpg: requesting key 39499BDB from hkp server keys.gnupg.net
gpg: key D39DC0E3: “Michal Papis (RVM signing) mpapis@gmail.com“ not changed
gpg: key D39DC0E3: “Totally Legit Signing Key mallory@example.org“ not changed
gpg: key 39499BDB: “Piotr Kuczynski piotr.kuczynski@gmail.com“ not changed
gpg: Total number processed: 3
gpg: unchanged: 3

不通过,下一步的\curl -sSL https://get.rvm.io | bash -s stable
或curl -L get.rvm.io | bash -s stable

也就不行。

寻找了很多招办法都不行,只能通过手动下载ruby高版本手动安装,或者通过另外一个工具rbenv安装

手动下载高版本rub安装

解压

[root@localhost tmp]# tar -zxvf ruby-2.6.1.tar.gz -C ../ruby-2.6
[root@localhost tmp]# ./configure --prefix=/usr/local/ruby -prefix是将ruby安装到指定目录,也可以自定义
[root@localhost tmp]# make && make install

安装完成

[root@localhost ruby]# bin/ruby -v
ruby 2.6.1p33 (2019-01-30 revision 66950) [x86_64-linux]

把ruby加入环境变量

[root@localhost ruby]# vim /etc/profile
[root@localhost ruby]# source /etc/profile
[root@localhost ruby]# echo $PATH
/usr/local/ruby/bin:/usr/local/jdk8/bin:/usr/local/jdk8/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin

其中设置PATH路径,把安装的ruby放在系统PATH前面,避免调用操作系统自带的ruby

这回执行gem install redis #安装Redis和ruby的接口
成功

2、配置集群

由于没有这么多Linux,就在同一台上模拟配置6个Redis实例作为六个节点。

在/usr/local/下新建redis_cluster文件,里边新建六个目录node6001~node6006当做六个节点

拷贝创建集群的脚本文件redis-trib.rb到redis_cluster

在分别拷贝配置文件Redis.conf到这些目录下,并且依次修改为自己对应的节点信息。

依次修改信息如下:

port  6001    
bind 192.168.18.140                           #默认ip为127.0.0.1 需要改为其他节点机器可访问的ip 否则创建集群时无法访问对应的端口,无法创建集群
daemonize    yes                               #redis后台运行
pidfile /var/run/redis_6001.pid          #PID
cluster-enabled  yes                        #开启集群  把注释#去掉
cluster-config-file nodes-6001.conf   #集群的配置  配置文件首次启动自动生成6001/6002...
cluster-node-timeout  15000               #请求超时  默认15秒,可自行设置
appendonly  yes                           #aof日志开启  有需要就开启,它会每次写操作都记录一条日志
dir /usr/local/redis-cluster/node6001/    #默认是./,分别修改为当前节点下

启动并检查六个节点是否启动

编写脚本\start-all.sh

#start 6 redis
/usr/local/redis/bin/redis-server /usr/local/redis-cluster/node6001/redis.conf
/usr/local/redis/bin/redis-server /usr/local/redis-cluster/node6002/redis.conf
/usr/local/redis/bin/redis-server /usr/local/redis-cluster/node6003/redis.conf
/usr/local/redis/bin/redis-server /usr/local/redis-cluster/node6004/redis.conf
/usr/local/redis/bin/redis-server /usr/local/redis-cluster/node6005/redis.conf
/usr/local/redis/bin/redis-server /usr/local/redis-cluster/node6006/redis.conf

stop-all.sh
#stop 6 redis
/usr/local/redis/bin/redis-cli -h 192.168.18.140 -p 6001 shutdown
/usr/local/redis/bin/redis-cli -h 192.168.18.140 -p 6002 shutdown
/usr/local/redis/bin/redis-cli -h 192.168.18.140 -p 6003 shutdown
/usr/local/redis/bin/redis-cli -h 192.168.18.140 -p 6004 shutdown
/usr/local/redis/bin/redis-cli -h 192.168.18.140 -p 6005 shutdown
/usr/local/redis/bin/redis-cli -h 192.168.18.140 -p 6006 shutdown

赋权:chmod 777 start-all.sh stop-all.sh

执行启动脚本,并检查:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

[root@localhost redis-cluster]# ./start-all.sh
[root@localhost redis-cluster]# netstat -tnlp | grep redis
tcp 0 0 192.168.18.140:6001 0.0.0.0:* LISTEN 34204/redis-server
tcp 0 0 192.168.18.140:6002 0.0.0.0:* LISTEN 34206/redis-server
tcp 0 0 192.168.18.140:6003 0.0.0.0:* LISTEN 34208/redis-server
tcp 0 0 192.168.18.140:6004 0.0.0.0:* LISTEN 34213/redis-server
tcp 0 0 192.168.18.140:6005 0.0.0.0:* LISTEN 34218/redis-server
tcp 0 0 192.168.18.140:6006 0.0.0.0:* LISTEN 34223/redis-server
tcp 0 0 192.168.18.140:16001 0.0.0.0:* LISTEN 34204/redis-server
tcp 0 0 192.168.18.140:16002 0.0.0.0:* LISTEN 34206/redis-server
tcp 0 0 192.168.18.140:16003 0.0.0.0:* LISTEN 34208/redis-server
tcp 0 0 192.168.18.140:16004 0.0.0.0:* LISTEN 34213/redis-server
tcp 0 0 192.168.18.140:16005 0.0.0.0:* LISTEN 34218/redis-server
tcp 0 0 192.168.18.140:16006 0.0.0.0:* LISTEN 34223/redis-server

[root@localhost redis-cluster]# ps -el | grep redis
5 S 0 34204 1 0 80 0 - 36491 ep_pol ? 00:00:00 redis-server
5 S 0 34206 1 0 80 0 - 36491 ep_pol ? 00:00:00 redis-server
5 S 0 34208 1 0 80 0 - 35979 ep_pol ? 00:00:00 redis-server
5 S 0 34213 1 0 80 0 - 35979 ep_pol ? 00:00:00 redis-server
5 S 0 34218 1 0 80 0 - 36491 ep_pol ? 00:00:00 redis-server
5 S 0 34223 1 0 80 0 - 36491 ep_pol ? 00:00:00 redis-server

执行关闭脚本并检查
[root@localhost redis-cluster]# ./stop-all.sh
[root@localhost redis-cluster]# netstat -tnlp | grep redis
[root@localhost redis-cluster]# ps -el | grep redis

一切没问题。

查看集群包括各个节点文件得到如下,文件结构没有问题:

../redis-cluster/
├── node6001
│  ├── appendonly.aof
│  ├── dump.rdb
│  ├── nodes-6001.conf
│  └── redis.conf
├── node6002
│  ├── appendonly.aof
│  ├── dump.rdb
│  ├── nodes-6002.conf
│  └── redis.conf
├── node6003
│  ├── appendonly.aof
│  ├── dump.rdb
│  ├── nodes-6003.conf
│  └── redis.conf
├── node6004
│  ├── appendonly.aof
│  ├── dump.rdb
│  ├── nodes-6004.conf
│  └── redis.conf
├── node6005
│  ├── appendonly.aof
│  ├── dump.rdb
│  ├── nodes-6005.conf
│  └── redis.conf
├── node6006
│  ├── appendonly.aof
│  ├── dump.rdb
│  ├── nodes-6006.conf
│  └── redis.conf
├── redis-trib.rb
├── start-all.sh
└── stop-all.sh

检查都没有问题,继续下一步。
nodes-*.conf这个文件代表集群文件

3、创建集群

执行创建脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[root@localhost redis-cluster]# ./redis-trib.rb create --replicas 1 192.168.18.140:6001 192.168.18.140:6002 192.168.18.140:6003 192.168.18.140:6004 192.168.18.140:6005 192.168.18.140:6006
>>> Creating cluster
>>> Performing hash slots allocation on 6 nodes...
Using 3 masters:
192.168.18.140:6001
192.168.18.140:6002
192.168.18.140:6003
Adding replica 192.168.18.140:6005 to 192.168.18.140:6001
Adding replica 192.168.18.140:6006 to 192.168.18.140:6002
Adding replica 192.168.18.140:6004 to 192.168.18.140:6003
>>> Trying to optimize slaves allocation for anti-affinity
[WARNING] Some slaves are in the same host as their master
M: 9a21d2551ce8531661be5ee5f57eea2c61f03dba 192.168.18.140:6001
slots:0-5460 (5461 slots) master
M: 050dc8a4a6c58ebe7deda200463a035d19637ced 192.168.18.140:6002
slots:5461-10922 (5462 slots) master
M: 1642f2248dd4e98bdd81971613f0c2afb1531127 192.168.18.140:6003
slots:10923-16383 (5461 slots) master
S: 5b5bf9e6f0f4c84abbaa93a61a9b3d9168411a09 192.168.18.140:6004
replicates 050dc8a4a6c58ebe7deda200463a035d19637ced
S: ff5f6edf1fe07f953796ecd19dda50d3a934ca3f 192.168.18.140:6005
replicates 1642f2248dd4e98bdd81971613f0c2afb1531127
S: b71c3f2524dd94c801cf84a9c90577991e98d34f 192.168.18.140:6006
replicates 9a21d2551ce8531661be5ee5f57eea2c61f03dba
Can I set the above configuration? (type 'yes' to accept):

以上信息说明了集群配置OK,master和salve都分配好,是否确认创建集群。yes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Can I set the above configuration? (type 'yes' to accept): yes
>>> Nodes configuration updated
>>> Assign a different config epoch to each node
>>> Sending CLUSTER MEET messages to join the cluster
Waiting for the cluster to join..
>>> Performing Cluster Check (using node 192.168.18.140:6001)
M: 9a21d2551ce8531661be5ee5f57eea2c61f03dba 192.168.18.140:6001
slots:0-5460 (5461 slots) master
1 additional replica(s)
M: 050dc8a4a6c58ebe7deda200463a035d19637ced 192.168.18.140:6002
slots:5461-10922 (5462 slots) master
1 additional replica(s)
S: 5b5bf9e6f0f4c84abbaa93a61a9b3d9168411a09 192.168.18.140:6004
slots: (0 slots) slave
replicates 050dc8a4a6c58ebe7deda200463a035d19637ced
M: 1642f2248dd4e98bdd81971613f0c2afb1531127 192.168.18.140:6003
slots:10923-16383 (5461 slots) master
1 additional replica(s)
S: ff5f6edf1fe07f953796ecd19dda50d3a934ca3f 192.168.18.140:6005
slots: (0 slots) slave
replicates 1642f2248dd4e98bdd81971613f0c2afb1531127
S: b71c3f2524dd94c801cf84a9c90577991e98d34f 192.168.18.140:6006
slots: (0 slots) slave
replicates 9a21d2551ce8531661be5ee5f57eea2c61f03dba
[OK] All nodes agree about slots configuration.
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.

集群创建成功!

验证、登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
redis-cli -c -h 192.168.37.131 -p 7001 ,其中-c表示以集群方式连接redis,-h指定ip地址,-p指定端口号
[root@localhost redis-cluster]# ../redis/bin/redis-cli -c -h 192.168.18.140 -p 6001

查看集群节点

192.168.18.140:6001> cluster nodes
050dc8a4a6c58ebe7deda200463a035d19637ced 192.168.18.140:6002@16002 master - 0 1550668881045 2 connected 5461-10922
5b5bf9e6f0f4c84abbaa93a61a9b3d9168411a09 192.168.18.140:6004@16004 slave 050dc8a4a6c58ebe7deda200463a035d19637ced 0 1550668879000 4 connected
9a21d2551ce8531661be5ee5f57eea2c61f03dba 192.168.18.140:6001@16001 myself,master - 0 1550668878000 1 connected 0-5460
1642f2248dd4e98bdd81971613f0c2afb1531127 192.168.18.140:6003@16003 master - 0 1550668879026 3 connected 10923-16383
ff5f6edf1fe07f953796ecd19dda50d3a934ca3f 192.168.18.140:6005@16005 slave 1642f2248dd4e98bdd81971613f0c2afb1531127 0 1550668879000 5 connected
b71c3f2524dd94c801cf84a9c90577991e98d34f 192.168.18.140:6006@16006 slave 9a21d2551ce8531661be5ee5f57eea2c61f03dba 0 1550668880036 6 connected

查看集群信息

192.168.18.140:6001> cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:6
cluster_my_epoch:1
cluster_stats_messages_ping_sent:829
cluster_stats_messages_pong_sent:843
cluster_stats_messages_sent:1672
cluster_stats_messages_ping_received:838
cluster_stats_messages_pong_received:829
cluster_stats_messages_meet_received:5
cluster_stats_messages_received:1672
192.168.18.140:6001>

4、新添加主节点

准备好Redis节点192.168.18.140:6007并启动起来,执行添加节点命令如下:
./redis-trib.rb add-node 192.168.18.140:6007 192.168.18.140:6001

添加后的节点设置主节点或从节点

hash槽重新分配

添加完主节点需要对主节点进行hash槽分配这样该主节才可以存数据,就成为主节点。

redis集群有16384个槽,集群中的每个master结点分配一些槽,通过查看集群结点可以看到槽占用情况。
./redis-trib.rb reshard 192.168.18.140:6001

提示移动多少槽,输入值1000。

之后提示移动到那个节点,输入节点ID

之后提示我们指定转移哪几个几点的哈希槽,输入all 表示从所有的主节点中随机转移,凑够1000个哈希槽

然后再输入yes,redis集群就开始分配哈希槽了。

分配完成后这个节点也就成为主节点。

如果是把这个节点变成从节点,比如让新节点成为6001的从节点:

redis-cli -c -p 6007 cluster replicate 0b00721a509444db793d28448d8

命令后面的ID就是6001的ID

使用下面命令来确认一下6007是否已经成为6001的从节点

redis-cli -p 6001 cluster nodes | grep slave | grep 6001ID

5、删除节点

如果删除主节点:该节点有100个哈希槽,首先把节点的哈希槽转移到其他主节点,命令:
./redis-trib.rb reshard 192.168.18.140:6001

系统会提示我们要移动多少哈希槽,这里移动1000个,因为新节点有1000个哈希槽
然后系统提示我们输入要接收这些哈希槽的节点的ID
然后要我们选择从那些节点中转出哈希槽,这里一定要输入6007这个节点的ID,最后输入 done  表示输入完毕
最后使用命令删除节点
./redis-trib.rb del-node 192.168.18.140:6001  需要删除的节点ID

如果节点是从节点,直接使用删除命令即可。

redis-trib.rb具有以下功能:

create:创建集群 --replicas可以指定从节点个数
check:检查集群
info:查看集群信息
fix:修复集群
reshard:在线迁移slot
rebalance:平衡集群节点slot数量
add-node:将新节点加入集群
del-node:从集群中删除节点
set-timeout:设置集群节点间心跳连接的超时时间
call:在集群全部节点上执行命令
import:将外部redis数据导入集群

以集群方式登录任意节点,即可以使用这些命令

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void testRedis() throws Exception {
// 创建并填充节点信息
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("192.168.18.140", 6001));
nodes.add(new HostAndPort("192.168.18.140", 6002));
nodes.add(new HostAndPort("192.168.18.140", 6003));
nodes.add(new HostAndPort("192.168.18.140", 6004));
nodes.add(new HostAndPort("192.168.18.140", 6005));
nodes.add(new HostAndPort("192.168.18.140", 6006));
// 创建集群对象
JedisCluster jedisCluster = new JedisCluster(nodes);
// 使用jedisCluster操作redis
String key = "cluster";
String setResult = jedisCluster.set(key, "hello redis cluster ");
System.out.println(setResult);
String getResult = jedisCluster.get(key);
System.out.println(getResult);
//关闭连接
jedisCluster.close();
}
上一页1…151617…25下一页
初晨

初晨

永远不要说你知道本质,更别说真相了。

249 日志
46 分类
109 标签
近期文章
  • WebSocket、Socket、TCP、HTTP区别
  • Springboot项目的接口防刷
  • 深入理解Volatile关键字及其实现原理
  • 使用vscode搭建个人笔记环境
  • HBase介绍安装与操作
© 2018 — 2020 Copyright
由 Hexo 强力驱动
|
主题 — NexT.Gemini v5.1.4