Sync.Map 深入学习

某次面试时被问到了 Go 当中支持并发的 map,本以为靠着了解 sync.Map 当中的 read and dirty 两层 map 的结构就能够应付过去,但深入一问如何修改,就答不太清楚了。

希望本篇文章能够帮助大家了解到 sync.Map 的结构组成和如何读写的。

结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Map struct {
	mu Mutex

        // 一个原子变量,里面包括一个 map 以及表示 dirty 是否存在 read 当中没有的 entry 的布尔值
        // 一般不需要带锁读写
	read atomic.Value // readOnly

        // 需要带锁读写
	dirty map[any]*entry

        // 记录 read 当中查询不到需要到 dirty 查询的次数
        // 当 misses 达到一定次数时,会将 dirty 整个复制到 read 当中
	misses int
}

// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
	m       map[any]*entry
	amended bool // true if the dirty map contains some key not in m.
}

// expunged is an arbitrary pointer that marks entries which have been deleted
// from the dirty map.
var expunged = unsafe.Pointer(new(any))

读写方法实现

读 / Load

  1. 从 read 中取出 map,判断 key 是否在 read 中

  2. 如果 key 不在 read 中

    1. 如果 amend,说明 dirty 有 read 没有的变量,需要到 dirty 中检查

      1. 上锁,再次检查 read 是否有相应的 key
      2. 没有则到 dirty 里面查询,并增加 misses
      3. 返回 dirty 查询的结果
  3. 否则返回 read 查询的结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (m *Map) Load(key any) (value any, ok bool) {
	read, _ := m.read.Load().(readOnly) // 取出 read 中的 map
	e, ok := read.m[key]
	if !ok && read.amended { // entry 不在 read 但是 dirty 当中存在一些 read 没有的 entry,所以需要到 dirty 看看
		m.mu.Lock()
		// 防止取锁等待的时候, read 被添加了相应 key 的 entry,重新判断一次(双重判断)
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended { 
			e, ok = m.dirty[key]
			// 无论 dirty 是否有这个 key,都得增加 misses,因为都需要进来 dirty 查询了
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load() // 从 entry 中取出 value,如果指针为空,或者指针已经 expunged,则返回空值,即不存在
}

func (e *entry) load() (value any, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == nil || p == expunged {
		return nil, false
	}
	return *(*any)(p), true
}

这里的 misses 记录,用于记录访问到 dirty 的次数。当如果查询访问落到 dirty 的次数太多,表明查询性能下降,所以直接把 dirty 复制给 read,并把 dirty 清空,misses 清零

1
2
3
4
5
6
7
8
9
func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

写 / Store

  1. 先检查 read 中有没有,如果有则判断是否被删除

    1. 没删除则直接写入到 read 中,返回
  2. 上锁

  3. 如果 read 中有,且被删除,则将 entry 的状态改成未删除,写入到 dirty,更新 entry 的值

  4. 如果 read 中没有但 dirty 有相应的 entry,则直接更新 entry 的值

  5. 如果都没有,更新 amended,写入新 entry 到 dirty

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (m *Map) Store(key, value any) {
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) { // 如果 read 当中有相应的 entry,则直接尝试乐观锁 CAS 写入 read
		return                                      // 如果 entry 已经在 dirty 删除了,则返回 false

	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() { // 通过 CAS 的方式将删除状态从 expunged 变成未删除,从而确保状态是未删除
			// 如果成功说明此前是删除状态是 expungd,则 dirty 中的 entry 已删除,需要重新添加
			m.dirty[key] = e
		}
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
		e.storeLocked(&value)
	} else {
		if !read.amended { // 更新 amended
			m.dirtyLocked() // 确保 dirty 存在,不存在则通过将 read 中所有元素复制过去进行初始化
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}


func (e *entry) tryStore(i *any) bool {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
			return true
		}
	}
}

删除 / Delete

  1. 找到 entry,调用 delete 方法把 entry 设成空指针,这里也是用 CAS 的方式来设置空指针,如果已经是空或者设成删除,则直接返回
  2. 如果 entry 不在 read 但在 dirty 里面,则把 dirty 里 key 和 entry 的对应关系也删掉
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended { // 如果 read 中不存在且 dirty 中可能存在
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			delete(m.dirty, key)
			// Regardless of whether the entry was present, record a miss: this key
			// will take the slow path until the dirty map is promoted to the read
			// map.
			m.missLocked() // 删除时的查询也需要记录 misses
		}
		m.mu.Unlock()
	}
	if ok {
		return e.delete()
	}
	return nil, false
}

func (e *entry) delete() (value any, ok bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return nil, false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) { // 把 entry 变成空指针
			return *(*any)(p), true
		}
	}
}

Range 方法

sync.Map 不支持内置 range 遍历方法,需要通过其提供的 Range 方法,传入函数进行遍历

遍历前会先检查一遍 amended,如果 dirty 有新数据,则把 dirty 转移到 read 当中

最终遍历的是 read 当中的 map

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (m *Map) Range(f func(key, value any) bool) {
	read, _ := m.read.Load().(readOnly)
	if read.amended {
		// 马上转移数据
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		if read.amended {
			read = readOnly{m: m.dirty}
			m.read.Store(read)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}

	for k, e := range read.m {
		v, ok := e.load()
		if !ok { // 如果 entry 已经被删除
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

上锁情况

  1. Load 情况下:

    1. read 不存在 key 但 amended 为 true:dirty 中有新写入的 read 中没有的数据,所以需要加锁到 dirty 里面查询,并且修改 misses 次数,如果 misses 次数过多还需要重新迁移 dirty 和 read
  2. Store 情况下:

    1. key 值在 read 中不存在
    2. key 值在 read 中存在但 entry 被删除了
    3. 上面的情况都需要进入到 dirty 进行写入更新
  3. Delete 情况下:

    1. read 不存在 key 但 amended 为 true

优化点

  1. 空间换时间。 通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。
  2. 使用只读数据(read),避免读写冲突。
  3. 动态调整,miss次数多了之后,将dirty数据提升为read。
  4. double-checking。
  5. 延迟删除。 删除一个键值只是打标记,只有在提升dirty的时候才清理删除的数据。
  6. 优先从read读取、更新、删除,因为对read的读取不需要锁。