某次面试时被问到了 Go 当中支持并发的 map,本以为靠着了解 sync.Map
当中的 read and dirty 两层 map 的结构就能够应付过去,但深入一问如何修改,就答不太清楚了。
希望本篇文章能够帮助大家了解到 sync.Map
的结构组成和如何读写的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| type Map struct {
mu Mutex
// 一个原子变量,里面包括一个 map 以及表示 dirty 是否存在 read 当中没有的 entry 的布尔值
// 一般不需要带锁读写
read atomic.Value // readOnly
// 需要带锁读写
dirty map[any]*entry
// 记录 read 当中查询不到需要到 dirty 查询的次数
// 当 misses 达到一定次数时,会将 dirty 整个复制到 read 当中
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[any]*entry
amended bool // true if the dirty map contains some key not in m.
}
// expunged is an arbitrary pointer that marks entries which have been deleted
// from the dirty map.
var expunged = unsafe.Pointer(new(any))
|
从 read 中取出 map,判断 key 是否在 read 中
如果 key 不在 read 中
如果 amend,说明 dirty 有 read 没有的变量,需要到 dirty 中检查
- 上锁,再次检查 read 是否有相应的 key
- 没有则到 dirty 里面查询,并增加 misses
- 返回 dirty 查询的结果
否则返回 read 查询的结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| func (m *Map) Load(key any) (value any, ok bool) {
read, _ := m.read.Load().(readOnly) // 取出 read 中的 map
e, ok := read.m[key]
if !ok && read.amended { // entry 不在 read 但是 dirty 当中存在一些 read 没有的 entry,所以需要到 dirty 看看
m.mu.Lock()
// 防止取锁等待的时候, read 被添加了相应 key 的 entry,重新判断一次(双重判断)
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 无论 dirty 是否有这个 key,都得增加 misses,因为都需要进来 dirty 查询了
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load() // 从 entry 中取出 value,如果指针为空,或者指针已经 expunged,则返回空值,即不存在
}
func (e *entry) load() (value any, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*any)(p), true
}
|
这里的 misses 记录,用于记录访问到 dirty 的次数。当如果查询访问落到 dirty 的次数太多,表明查询性能下降,所以直接把 dirty 复制给 read,并把 dirty 清空,misses 清零
1
2
3
4
5
6
7
8
9
| func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
|
先检查 read 中有没有,如果有则判断是否被删除
- 没删除则直接写入到 read 中,返回
上锁
如果 read 中有,且被删除,则将 entry 的状态改成未删除,写入到 dirty,更新 entry 的值
如果 read 中没有但 dirty 有相应的 entry,则直接更新 entry 的值
如果都没有,更新 amended,写入新 entry 到 dirty
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| func (m *Map) Store(key, value any) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) { // 如果 read 当中有相应的 entry,则直接尝试乐观锁 CAS 写入 read
return // 如果 entry 已经在 dirty 删除了,则返回 false
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() { // 通过 CAS 的方式将删除状态从 expunged 变成未删除,从而确保状态是未删除
// 如果成功说明此前是删除状态是 expungd,则 dirty 中的 entry 已删除,需要重新添加
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended { // 更新 amended
m.dirtyLocked() // 确保 dirty 存在,不存在则通过将 read 中所有元素复制过去进行初始化
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (e *entry) tryStore(i *any) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
|
- 找到 entry,调用 delete 方法把 entry 设成空指针,这里也是用 CAS 的方式来设置空指针,如果已经是空或者设成删除,则直接返回
- 如果 entry 不在 read 但在 dirty 里面,则把 dirty 里 key 和 entry 的对应关系也删掉
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended { // 如果 read 中不存在且 dirty 中可能存在
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked() // 删除时的查询也需要记录 misses
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
func (e *entry) delete() (value any, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) { // 把 entry 变成空指针
return *(*any)(p), true
}
}
}
|
sync.Map
不支持内置 range 遍历方法,需要通过其提供的 Range 方法,传入函数进行遍历
遍历前会先检查一遍 amended,如果 dirty 有新数据,则把 dirty 转移到 read 当中
最终遍历的是 read 当中的 map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| func (m *Map) Range(f func(key, value any) bool) {
read, _ := m.read.Load().(readOnly)
if read.amended {
// 马上转移数据
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok { // 如果 entry 已经被删除
continue
}
if !f(k, v) {
break
}
}
}
|
Load 情况下:
- read 不存在 key 但 amended 为 true:dirty 中有新写入的 read 中没有的数据,所以需要加锁到 dirty 里面查询,并且修改 misses 次数,如果 misses 次数过多还需要重新迁移 dirty 和 read
Store 情况下:
- key 值在 read 中不存在
- key 值在 read 中存在但 entry 被删除了
- 上面的情况都需要进入到 dirty 进行写入更新
Delete 情况下:
- read 不存在 key 但 amended 为 true
- 空间换时间。 通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。
- 使用只读数据(read),避免读写冲突。
- 动态调整,miss次数多了之后,将dirty数据提升为read。
- double-checking。
- 延迟删除。 删除一个键值只是打标记,只有在提升dirty的时候才清理删除的数据。
- 优先从read读取、更新、删除,因为对read的读取不需要锁。