MonitorMixin Source Code

2020-08-14

用法示例

require 'monitor'

class User
  attr_accessor :print_item
end
user = User.new
user.extend MonitorMixin

proc = Proc.new do |user|
  raise 'Must be a MonitorMixin' unless user.is_a?(MonitorMixin)

  user.synchronize do
    puts Thread.current
    5.times do
      sleep 0.1
      print user.print_item
    end
    puts ""
  end
end

sub_thread = Thread.new do
  user.mon_enter
  user.print_item = 'Sub '
  proc.call(user)
  user.mon_exit
end

user.synchronize do
  user.print_item = 'Main '
  proc.call(user)
  sleep 0.5
  puts 'Main thread release lock.'
end

sub_thread.join

=begin
#<Thread:0x00007f974e064040 run>
Main Main Main Main Main
Main thread release lock.
#<Thread:0x00007f974e047710@t.rb:22 run>
Sub Sub Sub Sub Sub

=end

在了解源码前我们先看一个用例.

user 是一个普通对象, 它 extendMonitorMixin 模块.

proc 是一个闭包, 它期待传入一个支持 MonitorMixin 的对象, 并在该对象上调用 synchronize, 希望不被打断地, 一边休息一边打印 5 个 print_item.

sub_thread 线程通过 mon_enter 进入锁, 先把 userprint_item 设为 Sub , 然后调用闭包.

主线程通过 synchronize 进入锁, 先把 userprint_item 设为 Main , 然后调用闭包, 执行完闭包后睡一会儿再释放锁.

一般情况下主线程会首先抢占到锁, 会不被打断地打印五个 Main, 等主线程释放锁之后 sub_thread 才执行一系列操作.

Source Code

这个用例的要点是, 闭包内有一层 synchronize 的包装, 线程的操作也有 synchronize 或者 mon_enter 的包装, 也就是典型的可重入锁的场景.

Ruby 的 Mutex 本身是不可重入锁, 那么 MonitorMixin 是怎么实现的呢, 来看一下源码:

user.extend MonitorMixin 会触发 Ruby 的方法回调:

  def self.extend_object(obj)
    super(obj)
    obj.__send__(:mon_initialize)
  end
  def mon_initialize
    if defined?(@mon_mutex) && @mon_mutex_owner_object_id == object_id
      raise ThreadError, "already initialized"
    end
    @mon_mutex = Thread::Mutex.new
    @mon_mutex_owner_object_id = object_id
    @mon_owner = nil
    @mon_count = 0
  end

mon_initialize 通过检查 @mon_mutex@mon_mutex_owner_object_id 来确定 user 是第一次初始化, 也就是要求 user.extend MonitorMixin 执行且只能执行一次. 然后设置一系列实例变量:

  • @mon_mutex 初始化为新的 Mutex 对象, 锁操作都会被代理到这个 Mutex 对象上;
  • @mon_mutex_owner_object_id 初始化为 userobject_id;
  • @mon_owner 初始化为 nil, 表示目前还没有线程占有 mon_mutex 锁;
  • @mon_owner 初始化 为 0, 表示重入的锁层次为 0;

至此, user 就可以使用 synchronize 或者 mon_enter 来使用锁了.

  def mon_enter
    if @mon_owner != Thread.current
      @mon_mutex.lock
      @mon_owner = Thread.current
      @mon_count = 0
    end
    @mon_count += 1
  end

@mon_owner 指向持有锁的线程.

如果当前线程持有 mon_mutex 锁:

  • 那么就直接给 @mon_count 加一, 表示重入锁的深度加一了;

如果当前线程未持有 mon_mutex 锁:

  • 当没有任何线程持有锁, 当前线程直接通过 @mon_mutex.lock 抢占锁, 并设置 @mon_owner 指向当前线程, 设置锁重入深度为 0.
  • @mon_owner 正指向其他线程, 当前线程直接通过 @mon_mutex.lock 阻塞地等待锁.
  def mon_exit
    mon_check_owner
    @mon_count -= 1
    if @mon_count == 0
      @mon_owner = nil
      @mon_mutex.unlock
    end
  end

  def mon_check_owner
    if @mon_owner != Thread.current
      raise ThreadError, "current thread not owner"
    end
  end

退出锁, 首先确认当前线程持有锁, 然后重入锁层次减一. 如果锁层次减到 0, @mon_owner 指向 nil, 并释放锁.

  def mon_synchronize
    mon_enter
    begin
      yield
    ensure
      mon_exit
    end
  end

  alias synchronize mon_synchronize

synchronizemon_entermon_exit 的组合.

小结

MonitorMixin 通过引入重用锁的层次计数器, 来实现了 Mutex 的可重入效果.

实现中要特别注意 @mon_owner 的检查, 在当前线程没有持有锁的情况下操作锁, 会抛 ThreadError.