在golang
中,map
是一种非常常用的数据结构。但是在并发环境下,map
是不安全的,会出现并发问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func main() {
m := make(map[int]bool)
// write goroutine
go func() {
for {
m[1] = true
}
}()
// read goroutine
go func() {
for {
_ = m[2]
}
}()
select {}
}
|
输出结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
$ go run main.go
fatal error: concurrent map read and map write
goroutine 18 [running]:
main.main.func2()
xxx/main.go:16 +0x28
created by main.main in goroutine 1
xxx/main.go:14 +0x76
goroutine 1 [select (no cases)]:
main.main()
main.go:20 +0x7b
goroutine 17 [runnable]:
main.main.func1()
main.go:9 +0x28
created by main.main in goroutine 1
main.go:7 +0x49
exit status 2
|
上述代码中,我们开了两个goroutine
,一个用于写,一个来读。一运行就报错了,哪怕读写的不是同一个key。
golang
官方是这么说的:
Map access is unsafe only when updates are occurring. As long as all goroutines are only reading—looking up elements in the map, including iterating through it using a for range loop—and not changing the map by assigning to elements or doing deletions, it is safe for them to access the map concurrently without synchronization.
考虑到有性能损失,官方没有将map
设计成原子操作。在并发读写时会有问题。
关于map
的源码,在runtime/map.go
中有,这里只截取并发检测的部分:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
const (
hashWriting = 4 // a goroutine is writing to the map
)
// A header for a Go map.
type hmap struct {
flags uint8
}
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// ...
// 往map写数据时,会将flags的hashWriting位置为1
h.flags ^= hashWriting
// ...
// 最后清掉hashWriting位的1
h.flags &^= hashWriting
}
// 各个读取的地方
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
}
|
简单来说在map
写入时,会对flags
的hashWriting
位置为1,读取时会检查flags
的hashWriting
位是否为1,如果为1则报错。
第一种解决方案,就是加个读写锁(sync.RWMutex
)。也是最简单的方案:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
type RWMap struct {
sync.RWMutex
m map[string]int
}
func (m *RWMap) Get(key string) (int, bool) {
// 读锁保护
m.RLock()
defer m.RUnlock()
val, ok := m.m[key]
return val, ok
}
func (m *RWMap) Set(key string, val int) {
// 写锁保护
m.Lock()
defer m.Unlock()
m.m[key] = val
}
func (m *RWMap) Del(key string) {
// 写锁保护
m.Lock()
defer m.Unlock()
delete(m.m, key)
}
func (m *RWMap) Len() int {
// 读锁保护
m.RLock()
defer m.RUnlock()
return len(m.m)
}
func (m *RWMap) Range(f func(key string, val int) bool) {
// 读锁保护
m.RLock()
defer m.RUnlock()
for k, v := range m.m {
if !f(k, v) {
break
}
}
}
|
这段代码中,我们对map
的几个常用操作进行了封装,加了读写锁。这样就可以在并发环境下安全的使用map
了。
golang
官方提供了一个并发安全的map
实现:sync.Map
这里只介绍它的两个方法,只于其他方法和实现不是本文重点:
1
2
3
4
5
6
7
|
package sync // import "sync"
func (m *Map) Store(key, value any)
将键值对存储到Map中
func (m *Map) Load(key any) (value any, ok bool)
第一个返回值是key对应的value,第二个返回值表示是否存在这个key
|
写了个简单的例子,着重看下是否可以并发安全的使用sync.Map
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func main() {
sm := sync.Map{}
wg := sync.WaitGroup{}
// 100个goroutine并发写入sync.Map
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
sm.Store(i, i)
}(i)
}
// 100个goroutine并发读取sync.Map
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
if value, ok := sm.Load(i); ok {
fmt.Println(value)
}
}(i)
}
wg.Wait()
}
|
读写端各自开了100个goroutine
去读写sync.Map
,最后没有报错,说明sync.Map
是并发安全的。
加个读写锁很简单,但是在高并发的情况下,读写锁很可能会成为性能瓶颈。而sync.Map
虽然并发安全,但是它的适用场景有限。只适合读多写少的场景。
参考官方faq:
As an aid to correct map use, some implementations of the language contain a special check that automatically reports at run time when a map is modified unsafely by concurrent execution. Also there is a type in the sync library called sync.Map that works well for certain usage patterns such as static caches, although it is not suitable as a general replacement for the builtin map type.
可见如果对性能敏感的话,sync.Map
也不是很好的选择。
让我们思考下,想实现并发安全的话,要么加锁,要么不加锁(类似redis
的单线程的思路)。但是在golang
中,必定会有大量并发请求,所以还是要考虑锁。如何提高锁的效率呢?
- 降低锁的持有时间
- 减少锁的粒度
针对1,这是业务代码要考虑的,不是底层库(如并发安全的map
)首要考虑的。那就考虑2,减少锁的粒度。
分片锁/分段锁就是减少锁的粒度一种思路。它将一把锁拆分成了多个小的锁,每个小锁负责一部分的数据。这样在读写时,只需要锁住对应的小锁,而不是整个大锁。从而减少了锁的粒度。
github中有一个star数挺多的库:concurrent-map,它就是一个分片锁的map
实现。
本质上通过将map
拆分成多个shard
(根据对key进行某种算法拆分),每个shard
有自己的读写锁。这样在读写时,只需要锁住对应的shard
,而不是整个map
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
var SHARD_COUNT = 32
// 最多有SHARD_COUNT个shard
type ConcurrentMap[K comparable, V any] struct {
shards []*ConcurrentMapShared[K, V]
// sharding, 我理解为shards下标的算法
sharding func(key K) uint32
}
type ConcurrentMapShared[K comparable, V any] struct {
items map[K]V
sync.RWMutex // Read Write mutex, guards access to internal map.
}
func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
m := ConcurrentMap[K, V]{
sharding: sharding,
shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT),
}
for i := 0; i < SHARD_COUNT; i++ {
// 对每个shard初始化
m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
}
return m
}
// Creates a new concurrent map.
func New[V any]() ConcurrentMap[string, V] {
return create[string, V](fnv32)
}
// GetShard returns shard under given key
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
// 根据key计算出shard的下标
return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}
// Sets the given value under the specified key.
func (m ConcurrentMap[K, V]) Set(key K, value V) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
// 通过写锁保护shard中的数据, 对shard中的items进行写操作
shard.items[key] = value
shard.Unlock()
}
// Get retrieves an element from map under given key.
func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// 读锁保护, 读取shard中的数据
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
|
核心是通过一个GetShard
的方法,获取对应的shard,然后对shard进行读写操作(操作时有读写锁保护)。
简单总结下,有三个解决方案:
- 对
golang
的map
进行加锁,如sync.RWMutex
- 使用
sync.Map
- 使用分片锁,如
concurrent-map