Golang usefull closed channel

2020-10-26

源码来源

https://github.com/adonovan/gopl.io/blob/master/ch9/memo4/memo.go

需求分析

由于 Golang 的 GC 会处理 channel, 所以一般情况下不需要显示地关闭 channel.

这个例子展示了如何利用已经关闭了的 channel 的特性:

当一个 channel 已经关闭后, <-ch 会立即返回零值.

how to use:

package main

import (
	"fmt"
	"sync"
	"time"
)

var cache = New(fetch)

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go getValue(&wg)
	}
	wg.Wait()
	fmt.Println("-----")
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go getValue(&wg)
	}
	wg.Wait()
}

func getValue(wg *sync.WaitGroup) {
	defer wg.Done()

	start := time.Now()
	r, _ := cache.Get("Hi")
	fmt.Printf("%s, %s\n", r, time.Now().Sub(start))
}

func fetch(key string) (interface{}, error) {
	time.Sleep(1 * time.Second)
	fmt.Println("Calculating...")
	return "<" + key + ">", nil
}

fetch 是个耗时方法, cache 想缓存他的结果, 当再次请求时立即返回缓存值.

我们期待的输出如下:

  • 第一次请求触发耗时计算, 之后的请求直接使用缓存;
  • 如果并发请求一个耗时计算, 第一个抢到锁的任务进行计算, 其他任务通过缓存获取.
/*
Calculating...
<Hi>, 1.003371641s
<Hi>, 1.003400981s
<Hi>, 1.003390591s
-----
<Hi>, 833ns
<Hi>, 486ns
<Hi>, 694ns
*/

实现

package main

import "sync"

type timeConsumingFunc func(key string) (interface{}, error)

type entry struct {
	value interface{}
	err   error
	ready chan struct{}
}

// entry collection
type Cache struct {
	f     timeConsumingFunc
	lock  sync.Mutex
	cache map[string]*entry
}

func New(f timeConsumingFunc) *Cache {
	return &Cache{
		f:     f,
		cache: make(map[string]*entry),
	}
}

func (c *Cache) Get(key string) (value interface{}, err error) {
	c.lock.Lock()
	entryPt := c.cache[key]
	if entryPt == nil {
		// First Get
		entryPt = &entry{ready: make(chan struct{})}
		c.cache[key] = entryPt
		c.lock.Unlock()

		entryPt.value, entryPt.err = c.f(key)
		close(entryPt.ready)
	} else {
		// by cache
		c.lock.Unlock()
		<-entryPt.ready
	}
	// get value and error by pointer
	return entryPt.value, entryPt.err
}

最巧妙的设计是, Cache 集合中, 存放的不是 entry, 而是 entry 的指针.

flow-one: 无并发第一次访问时

代码等效于:

  c.lock.Lock()
	entryPt := c.cache[key]
	entryPt = &entry{ready: make(chan struct{})}
	c.cache[key] = entryPt
  c.lock.Unlock()

	entryPt.value, entryPt.err = c.f(key)
	close(entryPt.ready)
	return entryPt.value, entryPt.err

flow-two: 无并发再次访问时

代码等效于:

  c.lock.Lock()
	entryPt := c.cache[key]
	c.lock.Unlock()
	<-entryPt.ready
	return entryPt.value, entryPt.err

entryPt.ready 已经处于关闭的状态了, 执行到这里会立即返回空字符串.

当第一次访问存在并发

第一个抢到锁的 goroutine 会按照 flow-one 执行, 在 c.lock.Unlock() 之前, entryPt 已经设置到 cache 中了, 但是具体的结果还没计算.

锁释放后, 第二个抢到锁的 goroutine 得以进入, 代码等效于 flow-two, 此时的 entryPt 不是 nil, 他顺序执行, 释放锁, 然后阻塞地等待 <-entryPt.ready 的消息.

第二个 goroutine 释放锁后, 接着有另一个 goroutine 抢到锁, 同第二个一样的流程, 也阻塞地等待 <-entryPt.ready 的消息, 如果还有其他的并发也同样.

他们都在等第一个抢到锁的 goroutine 执行完:

	entryPt.value, entryPt.err = c.f(key)
	close(entryPt.ready)

关闭的 channel 立即返回零值, <-entryPt.ready 有了消息, 这些并发的 goroutine 通过指针 entryPt 得到了已经缓存的计算结果.