这里结合 Ruby 的 Redis Gem 来解释两者的异同.

pipelined 管道

require 'redis'
require 'pry'
redis = Redis.new
redis.flushdb

result = redis.pipelined do
  @r1 = redis.set('k1', 'v1')
  p @r1
  # <Redis::Future [:set, "k1", "v1"]>
  @r2 = redis.get('k1')
  p @r2
  # <Redis::Future [:get, "k1"]>
  @r3 = redis.set('k2', 'v2')
  p @r3
  # <Redis::Future [:set, "k2", "v2"]>
  @r4 = redis.get('k2')
  p @r4
  # <Redis::Future [:get, "k2"]>
  begin
    @r4.value
  rescue Redis::FutureNotReady => e
    p e
    # #<Redis::FutureNotReady: Value will be available once the pipeline executes.>
  end
end
p result
["OK", "v1", "OK", "v2"]
p @r1.value, @r2.value, @r3.value, @r4.value
# "OK"
# "v1"
# "OK"
# "v2"


pipelined 很好用也很好理解, 就是把多个命令打包, 由客户端一起发出, 然后一起收到各个命令的结果.

Ruby 的 Redis Gem 要求把命令放到 pipelined 的 block 中.

在 block 结束之前, 我们都拿不到具体命令的返回值, 而是得到一个 Redis::Future 对象.

如果在该 block 内强制读取 value, 会抛出 Redis::FutureNotReady 的异常.

而在 block 外, 可以通过 value 来获取 Redis::Future 的返回值, 或者去 pipelined 返回的数组里去取对应的值.

关于实现原理,

普通的多条读写是:

写-读(耗时IO) + 写-读(耗时IO) + 写-读(耗时IO)

pipelined是:

写-写-写 + 读(耗时IO)-读-读

需要强调的一点是, pipelined 是客户端提供的功能, 通过改变读写顺序来减少读操作的耗时.

require 'redis'
redis = Redis.new
redis.flushdb

begin
  result = redis.pipelined do
    @r1 = redis.set('k1', 'v1')
    @r2 = redis.incr('k1')
    @r3 = redis.set('k2', 'v2')
    @r4 = redis.get('k2')
  end
rescue => e
  p e
    # #<Redis::CommandError: ERR value is not an integer or out of range>
ensure
  p redis.get('k1')
  # "v1"
  p redis.get('k2')
  # "v2"
end
p result
# nil

begin
  p @r1.value, @r2.value, @r3.value, @r4.value
rescue Redis::FutureNotReady => e
  p e
  # #<Redis::FutureNotReady: Value will be available once the pipeline executes.>
end

当 pipelined 中的某条操作抛出异常时, 不会打断后续的操作. 也就是说, 对 ‘v1’ 和 ‘v2’ 的set操作都已经生效了.

区别是 pipelined 会抛出相关的异常, 而且返回nil, Redis::Future 的 value 不再可用(包括发生异常以前的命令).

multi 事务

这里将 Redis 的 multi 翻译为 事务, 但是它只保证了事务的原子性, 而且不支持回滚.

require 'redis'

redis = Redis.new
redis2 = Redis.new

p redis.multi
# "OK"
p redis.set('k1', 'v1')
# "QUEUED"
p redis.get('k1')
# "QUEUED"
p redis.set('k2', 'v2')
# "QUEUED"
p redis.get('k2')
# "QUEUED"

p '---'
p redis2.get('k1')
# nil
p '---'

p redis.exec
# ["OK", "v1", "OK", "v2"]

以上是对命令行multi的直接映射, 以一条 multi 命令开启事务.

之后的每一条命令都单独发给Server, Server收到后加入该事务的待执行队列, Server只会返回客户端 QUEUED 这个字符串来表示入队成功.

当Server收到 exec 命令时, 就原子性地执行这一批命令; 当收到 discard 命令, 就丢掉之前收集的命令, 放弃执行.

require 'redis'

redis = Redis.new

p redis.multi
# "OK"
p redis.set('k1', 'v1')
# "QUEUED"
p redis.get('k1')
# "QUEUED"
p redis.incr('k1')
# "QUEUED"
p redis.set('k2', 'v2')
# "QUEUED"
p redis.get('k2')
# "QUEUED"
p redis.exec
# ["OK", "v1", #<Redis::CommandError: ERR value is not an integer or out of range>, "OK", "v2"]

其中一个命令异常时, 后续的命令也继续执行, exec 返回数组, 包含各个命令的执行结果.

require 'redis'

redis = Redis.new

begin
  result = redis.multi do
    p redis.set('k1', 'v1')
    # <Redis::Future [:set, "k1", "v1"]>
    p redis.get('k1')
    # <Redis::Future [:get, "k1"]>
    p redis.incr('k1')
    # <Redis::Future [:incr, "k1"]>
    p redis.set('k2', 'v2')
    # <Redis::Future [:set, "k2", "v2"]>
    p redis.get('k2')
    # <Redis::Future [:get, "k2"]>
  end
rescue Redis::CommandError => e
  p redis.get('k1')
  # "v1"
  p redis.get('k2')
    # "v2"
ensure
  p result
  # nil
end

WATCH and UNWATCH

watch 命令跟 multi 配合使用, 实现对某个key的乐观锁.

require 'redis'

redis = Redis.new

key = :my_key
redis.set key, 100
redis.watch key

result = redis.multi do
  redis.incr key
  redis.incr key
end

p result
p redis.get key

outputs:

[101, 102]
"102"

require 'redis'

redis = Redis.new

key = :my_key
redis.set key, 100
redis.watch key
redis.incr key

result = redis.multi do
  redis.incr key
  redis.incr key
end

p result
p redis.get key

outputs:

nil
"101"

他们的区别是第二个例子里, watch 和 multi 之间, 我们对 watch 的 key 做了值的修改.

如此一来, multi 就会失败.

我们可以手动 unwatch, unwatch 只能将所有的 watch 都清掉, 不能指定单独的key.

还有就是, 如果我们调用了 multi 的 exec 和 discard 的之后, 就不需要手动 unwatch .

watch 跨客户端的情况

在同一个client中使用 watch 和 multi :

require 'redis'

redis = Redis.new
redis1 = Redis.new
redis2 = Redis.new

redis.set 'k', 100

redis1.watch 'k'

redis.incr 'k'

result = redis1.multi do
  redis1.incr 'k'
  redis1.incr 'k'
end

p result
p redis.get 'k'

# output:
# nil
# "101"

watch 和 multi 在不同的客户端:

require 'redis'

redis = Redis.new
redis1 = Redis.new
redis2 = Redis.new

redis.set 'k', 100

redis2.watch 'k'

redis.incr 'k'

result = redis1.multi do
  redis1.incr 'k'
  redis1.incr 'k'
end

p result
p redis.get 'k'

# output:
# [102, 103]
# "103"

再加一个case:

require 'redis'

redis = Redis.new
redis1 = Redis.new
redis2 = Redis.new

redis.set 'k', 100

redis2.watch 'k'

redis.incr 'k'

result1 = redis1.multi do
  redis1.incr 'k'
  redis1.incr 'k'
end

p result1
p redis.get 'k'
# [102, 103]
# "103"


result2 = redis2.multi do
  redis2.incr 'k'
  redis2.incr 'k'
end

p result2
p redis.get 'k'
# nil
# "103"

小结:

由此证明, watch 的实现是基于连接的.

同一个连接内, multi 前有 watch 的话, 如果被监视的key发生改变(不限由谁来改动该key的值), multi 将不会执行, 并取消对该key的监视.

在多个不同的连接里, 连接A里对key进行监视, 对连接B里的multi无效, 仅仅对连接A中的下一个multi生效.

Reference

https://redis.io/commands/unwatch

https://redis.io/commands/multi

https://www.jianshu.com/p/361cb9cd13d5