1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
// process is used to process the Mark channel. This is not thread-safe,
// so only run one goroutine for process. One is sufficient, because
// all goroutine ops use purely memory and cpu.
// Each index has to emit atleast one begin watermark in serial order otherwise waiters
// can get blocked idefinitely. Example: We had an watermark at 100 and a waiter at 101,
// if no watermark is emitted at index 101 then waiter would get stuck indefinitely as it
// can't decide whether the task at 101 has decided not to emit watermark or it didn't get
// scheduled yet.
func (w *WaterMark) process(closer *z.Closer) {
defer closer.Done()
// 创建一个堆
var indices uint64Heap
// pending maps raft proposal index to the number of pending mutations for this proposal.
// 记录并发冲突值的用于检测的实例
pending := make(map[uint64]int)
// 存储回调channel, 一个时间戳上可以等待多个channel, 在orcale.readTs()中的waitForMark()
waiters := make(map[uint64][]chan struct{})
// 初始化堆
heap.Init(&indices)
// 真正执行逻辑的闭包函数
processOne := func(index uint64, done bool) {
// If not already done, then set. Otherwise, don't undo a done entry.
// 通过传入的时间戳,从pending数组中取值
prev, present := pending[index]
// 如果不存在则push进堆中
if !present {
heap.Push(&indices, index)
}
delta := 1
// 根据done判断是开始事务还是结束事务进行置位1或-1
if done {
delta = -1
}
// 如果是一个begin操作,即开启事务的标记的时候,在pending数组计数位里+1
// 如果是一个commit操作,即终止事务的标记的时候,在pending数组计数位里-1
// 让所有事务都能感知到活跃事务之间的关联
pending[index] = prev + delta
// Update mark by going through all indices in order; and checking if they have
// been done. Stop at the first index, which isn't done.
// 获取当前的水位信息
doneUntil := w.DoneUntil()
// 当前水位大于时间戳,证明已经不需要再去关注并发性了
if doneUntil > index {
// 断言结束操作
AssertTruef(false, "Name: %s doneUntil: %d. Index: %d", w.Name, doneUntil, index)
}
until := doneUntil
loops := 0
// 循环对堆数组进行pop遍历操作,弹出最小的事务的时间戳
for len(indices) > 0 {
min := indices[0]
// 判断是否大于0,证明最小的事务时间戳没有结束
if done := pending[min]; done > 0 {
// 没有其他事务在等待,跳出循环
break // len(indices) will be > 0.
}
// Even if done is called multiple times causing it to become
// negative, we should still pop the index.
// done <= 0 则说明事务已经提交,删除它曾经存在的痕迹
heap.Pop(&indices)
delete(pending, min)
// 水位移动
until = min
loops++
}
// 判断水位是否发生了变化
if until != doneUntil {
// 有所变化则通过cas赋值
AssertTrue(atomic.CompareAndSwapUint64(&w.doneUntil, doneUntil, until))
}
// 唤醒操作的闭包
notifyAndRemove := func(idx uint64, toNotify []chan struct{}) {
// 遍历通知channel的数组,一个个close掉
for _, ch := range toNotify {
close(ch)
}
// 在waiters中移除对应的时间戳
delete(waiters, idx) // Release the memory back.
}
// 如果水位发生移动
if until-doneUntil <= uint64(len(waiters)) {
// Issue #908 showed that if doneUntil is close to 2^60, while until is zero, this loop
// can hog up CPU just iterating over integers creating a busy-wait loop. So, only do
// this path if until - doneUntil is less than the number of waiters.
// 遍历原水位到当前水位
for idx := doneUntil + 1; idx <= until; idx++ {
// 把水位中的index拿出,得到回调函数的channel
if toNotify, ok := waiters[idx]; ok {
// 进行逐个关闭
notifyAndRemove(idx, toNotify)
}
}
} else {
for idx, toNotify := range waiters {
if idx <= until {
notifyAndRemove(idx, toNotify)
}
}
} // end of notifying waiters.
}
// 此方法的主体,循环for-select处理
for {
select {
// 关闭任务
case <-closer.HasBeenClosed():
return
// 接收markChannel, 100长的channel
case mark := <-w.markCh:
// 判断有无水位的信息
if mark.waiter != nil {
// 获取已提交事务的最大版本号的水位
doneUntil := atomic.LoadUint64(&w.doneUntil)
// 比较时间戳大小关系,如果当前已提交事务时间戳大于读时间戳,不需要等待,直接close
if doneUntil >= mark.index {
close(mark.waiter)
} else {
// 否则的话,读时间戳大于水位时间戳
// 在之前有未完成的活跃事务,不能获取读取时间戳,否则可能读取道脏数据
// 创建waiters
ws, ok := waiters[mark.index]
if !ok {
// 如果该读时间戳未在waiters中存在,创建channel数组
waiters[mark.index] = []chan struct{}{mark.waiter}
} else {
// 如果不空的话,说明之前已经有其他的事务在找个时间戳上等待,直接append
waiters[mark.index] = append(ws, mark.waiter)
}
}
// 读取时间戳和提交时间戳的操作都没有mark对象
} else {
// 如果当前时间戳是有效的
if mark.index > 0 {
// 对这个时间戳调用闭包进行逻辑处理
// mark.done是一个bool值,在begin的时候是false,在完成的时候为true
processOne(mark.index, mark.done)
}
// 遍历堆数组,对所有的节点进行一次处理逻辑
for _, index := range mark.indices {
processOne(index, mark.done)
}
}
}
}
}
|