golang map并发问题解决方案汇总

问题描述

golang中,map是一种非常常用的数据结构。但是在并发环境下,map是不安全的,会出现并发问题。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func main() {
	m := make(map[int]bool)

    // write goroutine
	go func() {
		for {
			m[1] = true
		}
	}()

	// read goroutine
	go func() {
		for {
			_ = m[2]
		}
	}()

	select {}
}

输出结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
$ go run main.go 
fatal error: concurrent map read and map write

goroutine 18 [running]:
main.main.func2()
        xxx/main.go:16 +0x28
created by main.main in goroutine 1
        xxx/main.go:14 +0x76

goroutine 1 [select (no cases)]:
main.main()
        main.go:20 +0x7b

goroutine 17 [runnable]:
main.main.func1()
        main.go:9 +0x28
created by main.main in goroutine 1
        main.go:7 +0x49
exit status 2

上述代码中,我们开了两个goroutine,一个用于写,一个来读。一运行就报错了,哪怕读写的不是同一个key。

源码分析

golang官方是这么说的:

Map access is unsafe only when updates are occurring. As long as all goroutines are only reading—looking up elements in the map, including iterating through it using a for range loop—and not changing the map by assigning to elements or doing deletions, it is safe for them to access the map concurrently without synchronization.

考虑到有性能损失,官方没有将map设计成原子操作。在并发读写时会有问题。

关于map的源码,在runtime/map.go中有,这里只截取并发检测的部分:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
const (
    hashWriting  = 4 // a goroutine is writing to the map
)

// A header for a Go map.
type hmap struct {
    flags     uint8
}

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    // ...
    // 往map写数据时,会将flags的hashWriting位置为1
    h.flags ^= hashWriting
    // ...
    // 最后清掉hashWriting位的1
    h.flags &^= hashWriting
}

// 各个读取的地方
if h.flags&hashWriting != 0 {
    fatal("concurrent map read and map write")
}

简单来说在map写入时,会对flagshashWriting位置为1,读取时会检查flagshashWriting位是否为1,如果为1则报错。

解决方案

加锁

第一种解决方案,就是加个读写锁(sync.RWMutex)。也是最简单的方案:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type RWMap struct {
	sync.RWMutex
	m map[string]int
}

func (m *RWMap) Get(key string) (int, bool) {
    // 读锁保护
	m.RLock()
	defer m.RUnlock()
	val, ok := m.m[key]
	return val, ok
}

func (m *RWMap) Set(key string, val int) {
    // 写锁保护
	m.Lock()
	defer m.Unlock()
	m.m[key] = val
}

func (m *RWMap) Del(key string) {
    // 写锁保护
	m.Lock()
	defer m.Unlock()
	delete(m.m, key)
}

func (m *RWMap) Len() int {
    // 读锁保护
	m.RLock()
	defer m.RUnlock()
	return len(m.m)
}

func (m *RWMap) Range(f func(key string, val int) bool) {
    // 读锁保护
	m.RLock()
	defer m.RUnlock()

	for k, v := range m.m {
		if !f(k, v) {
			break
		}
	}
}

这段代码中,我们对map的几个常用操作进行了封装,加了读写锁。这样就可以在并发环境下安全的使用map了。

sync.Map

golang官方提供了一个并发安全的map实现:sync.Map

这里只介绍它的两个方法,只于其他方法和实现不是本文重点:

1
2
3
4
5
6
7
package sync // import "sync"

func (m *Map) Store(key, value any)
    将键值对存储到Map中

func (m *Map) Load(key any) (value any, ok bool)
    第一个返回值是key对应的value,第二个返回值表示是否存在这个key

写了个简单的例子,着重看下是否可以并发安全的使用sync.Map:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
	sm := sync.Map{}
	wg := sync.WaitGroup{}

	// 100个goroutine并发写入sync.Map
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			sm.Store(i, i)
		}(i)
	}

	// 100个goroutine并发读取sync.Map
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			if value, ok := sm.Load(i); ok {
				fmt.Println(value)
			}
		}(i)
	}

	wg.Wait()
}

读写端各自开了100个goroutine去读写sync.Map,最后没有报错,说明sync.Map是并发安全的。

分片锁

加个读写锁很简单,但是在高并发的情况下,读写锁很可能会成为性能瓶颈。而sync.Map虽然并发安全,但是它的适用场景有限。只适合读多写少的场景。

参考官方faq:

As an aid to correct map use, some implementations of the language contain a special check that automatically reports at run time when a map is modified unsafely by concurrent execution. Also there is a type in the sync library called sync.Map that works well for certain usage patterns such as static caches, although it is not suitable as a general replacement for the builtin map type.

可见如果对性能敏感的话,sync.Map也不是很好的选择。

让我们思考下,想实现并发安全的话,要么加锁,要么不加锁(类似redis的单线程的思路)。但是在golang中,必定会有大量并发请求,所以还是要考虑锁。如何提高锁的效率呢?

  1. 降低锁的持有时间
  2. 减少锁的粒度

针对1,这是业务代码要考虑的,不是底层库(如并发安全的map)首要考虑的。那就考虑2,减少锁的粒度。

分片锁/分段锁就是减少锁的粒度一种思路。它将一把锁拆分成了多个小的锁,每个小锁负责一部分的数据。这样在读写时,只需要锁住对应的小锁,而不是整个大锁。从而减少了锁的粒度。

01

github中有一个star数挺多的库:concurrent-map,它就是一个分片锁的map实现。

本质上通过将map拆分成多个shard(根据对key进行某种算法拆分),每个shard有自己的读写锁。这样在读写时,只需要锁住对应的shard,而不是整个map

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
var SHARD_COUNT = 32

// 最多有SHARD_COUNT个shard
type ConcurrentMap[K comparable, V any] struct {
	shards   []*ConcurrentMapShared[K, V]
    // sharding, 我理解为shards下标的算法
	sharding func(key K) uint32
}

type ConcurrentMapShared[K comparable, V any] struct {
	items        map[K]V
	sync.RWMutex // Read Write mutex, guards access to internal map.
}

func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
	m := ConcurrentMap[K, V]{
		sharding: sharding,
		shards:   make([]*ConcurrentMapShared[K, V], SHARD_COUNT),
	}
	for i := 0; i < SHARD_COUNT; i++ {
        // 对每个shard初始化
		m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
	}
	return m
}

// Creates a new concurrent map.
func New[V any]() ConcurrentMap[string, V] {
	return create[string, V](fnv32)
}

// GetShard returns shard under given key
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
    // 根据key计算出shard的下标
	return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}

// Sets the given value under the specified key.
func (m ConcurrentMap[K, V]) Set(key K, value V) {
	// Get map shard.
	shard := m.GetShard(key)
	shard.Lock()
    // 通过写锁保护shard中的数据, 对shard中的items进行写操作
	shard.items[key] = value
	shard.Unlock()
}

// Get retrieves an element from map under given key.
func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
	// Get shard
	shard := m.GetShard(key)
	shard.RLock()
    // 读锁保护, 读取shard中的数据
	val, ok := shard.items[key]
	shard.RUnlock()
	return val, ok
}

核心是通过一个GetShard的方法,获取对应的shard,然后对shard进行读写操作(操作时有读写锁保护)。


简单总结下,有三个解决方案:

  1. golangmap进行加锁,如sync.RWMutex
  2. 使用sync.Map
  3. 使用分片锁,如concurrent-map
updatedupdated2024-08-162024-08-16