Redis 管道技术

管道

Redis是一种基于客户端-服务端模型以及请求/响应协议的TCP服务。这意味着通常情况下一个请求会遵循以下步骤:

  • 客户端向服务端发送一个查询请求,并监听Socket返回,通常是以阻塞模式,等待服务端响应。
  • 服务端处理命令,并将结果返回给客户端。

Redis 管道技术可以在服务端未响应时,客户端可以继续向服务端发送请求,并最终一次性读取所有服务端的响应。

管道技术最显著的优势是提高了 redis 服务的性能。

pipeline的思想

1
2
3
如果client执行一些相互之间无关的命令或者不需要获取命令的返回值,那么redis允许你连续发送多条命令,而不需要等待前面命令执行完毕。
比如我们执行3条INCR命令,如果使用管道,理论上只需要一个RTT+3条命令的执行时间即可,如果不适用管道,那么可能需要额外的两个RTT时间。
管道相当于批处理脚本,相当于是命令集。

Redis的管道可以在大量数据需要一次性操作完成的时候,使用Pipeline进行批处理,将一大队的操作合并成一次操作,可以减少链路层的时间消耗。

1
2
3
4
5
6
7
8
Pipeline pipe = jedis.pipelined(); // 先创建一个pipeline的链接对象
long start_pipe = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
pipe.set(String.valueOf(i), String.valueOf(i));
}
pipe.sync(); // 获取所有的response
long end_pipe = System.currentTimeMillis();
logger.info("the pipe total time is:" + (end_pipe - start_pipe));
SCAN、SSCAN、HSCAN、ZSCAN

SCAN、SSCAN、HSCAN、ZSCAN 4 个命令,分别用于集合、哈希键及有序集等。

1
2
3
4
SCAN:命令用于迭代当前数据库中的数据库键。
SSCAN:命令用于迭代集合键中的元素。
HSCAN:命令用于迭代哈希键中的键值对。
ZSCAN:命令用于迭代有序集合中的元素(包括元素成员和元素分值)。

SCAN cursor [MATCH pattern] [COUNT count]

当需要模糊查询或者大批量时候,keys * 进行查询 key 的时候会进行堵塞,导致 redis 整体不可用,因为redis是单线程,而使用 scan 命令则不会。

scan 游标 MATCH <返回和给定模式相匹配的元素> count 每次迭代所返回的元素数量 SCAN 命令是增量的循环,每次调用只会返回一小部分的元素。

在 Redis 中的具体用法如下:

1
2
scan 0 match xttblog.com* count 5
sscan myset 0 match 胡*

SCAN 命令对应的 Jedis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Jedis jedis = pool.getResource();
ScanParams scanParams = new ScanParams();
List<String> list = new ArrayList<>();
scanParams.match(key);
scanParams.count(100);
//当 SCAN 命令的游标参数被设置为 0 时, 服务器将开始一次新的迭代, 而当服务器向用户返回值为 0 的游标时, 表示迭代已结束。
String cursor = ScanParams.SCAN_POINTER_START;
while (true) {
ScanResult<String> scan = jedis.scan(cursor, scanParams);
List<String> elements = scan.getResult();
if (CollectionUtils.isNotEmpty(elements)) {
list.addAll(elements);
}
String cursor = scan.getStringCursor();
log.info("scan:返回用于下次遍历的游标 " + cursor);
log.info("scan:返回结果 " + elements);
if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
break;
}
}
jedis.close();

其他操作类似。https://www.xttblog.com/?p=3635

注意:SCAN 命令不能保证每次返回的值都是有序的,同一个 key 有可能返回多次,不做区分,需要应用程序去处理。

一个批量删除的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//key有模糊匹配*,查找到对应的去删除,启动一个线程去查找。通过游标的方式,用管道技术操作。
private Long executeDel(Jedis jedis, String key) throws Exception {
Long count = 0L;
CountDownLatch countDownLatch = new CountDownLatch(0);
Future[] futureArray = new Future[1];
ScanParams scanParams = new ScanParams().count(1);
scanParams.match(key);
ExecutorService executor = Executors.newSingleThreadExecutor();
futureArray[0] = executor.submit(new JedisDeleteThread(jedis, scanParams, countDownLatch));
try {
//等待线程汇总
countDownLatch.await();
try {
if (null != futureArray && futureArray.length > 0) {
for (int i = 0; i < futureArray.length; i++) {
count += (long) futureArray[i].get();
}
}
} catch (InterruptedException e) {
log.error(e.getMessage());
} catch (ExecutionException e) {
log.error(e.getMessage());
}
} catch (InterruptedException e) {
log.error(e.getMessage());
}
return count;
}

//线程 执行查找和删除处理
class JedisDeleteThread implements Callable<Long> {
private Jedis jedis;
ScanParams params;
CountDownLatch countDownLatch;

public JedisDeleteThread(Jedis jedis, ScanParams params, CountDownLatch downLatch) {
this.jedis = jedis;
this.params = params;
this.countDownLatch = downLatch;
}

@Override
public Long call() throws Exception {
long sum = 0;
try {
String cursor = ScanParams.SCAN_POINTER_START;
ScanResult<String> scanResult = null;
List<String> result = null;
Pipeline pipeline = jedis.pipelined();
do {
scanResult = jedis.scan(cursor, params);
result = scanResult.getResult();
if (result != null && result.size() > 0) {
sum += result.size();
int num = 0;
for (String key : result) {
pipeline.del(key);
num++;
if (num > 500) {
num = 0;
pipeline.sync();
}
}
if (num > 0) {
pipeline.sync();
}
}
cursor = scanResult.getStringCursor();
}
while (!cursor.equals("0"));
if (pipeline != null) {
pipeline.close();
}
} catch (Exception e) {
log.error("delete key error:", e);
} finally {
countDownLatch.countDown();
jedis.close();
}
return sum;
}
}