1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
|
func Open(opt Options) (*DB, error) {
// 检查参数
checkAndSetOptions(&opt)
// 创建了三个目录锁,防止其他进程注册到同一个目录造成冲突
var dirLockGuard, valueDirLockGuard *directoryLockGuard
// Create directories and acquire lock on it only if badger is not running in InMemory mode.
// We don't have any directories/files in InMemory mode so we don't need to acquire
// any locks on them.
// 判断参数配置为只基于内存
if !opt.InMemory {
// 创建目录
createDirs(opt)
var err error
if !opt.BypassLockGuard {
// 给Dir加目录锁
dirLockGuard, _ = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
// 方法末尾释放锁
defer func() {
if dirLockGuard != nil {
_ = dirLockGuard.release()
}
}()
// 获取Key&ValuePtr的绝对路径
absDir, _ := filepath.Abs(opt.Dir)
// 获取ValueLog的绝对路径
absValueDir, _ := filepath.Abs(opt.ValueDir)
// 如果ValueDir和Dir不相同,需要各自加锁
if absValueDir != absDir {
// 给ValueDir加目录锁
valueDirLockGuard, _ = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
// 释放锁
defer func() {
if valueDirLockGuard != nil {
_ = valueDirLockGuard.release()
}
}()
}
}
}
// 打开或创建Manifest文件,(采用mmap方式打开,在后面详细展开)
manifestFile, manifest, _ := openOrCreateManifestFile(opt)
// 关闭Manifest文件
defer func() {
if manifestFile != nil {
_ = manifestFile.close()
}
}()
// 创建内存中的db数据结构
db := &DB{
// memtable, 因为有多个,所以要创建数组
imm: make([]*memTable, 0, opt.NumMemtables),
// 刷新请求的channel
flushChan: make(chan flushTask, opt.NumMemtables),
// 写请求的channel
writeCh: make(chan *request, kvWriteChCapacity),
// 配置信息opt
opt: opt,
// 刚初始化好的manifest实例
manifest: manifestFile,
// Key&ValuePtr目录锁
dirLockGuard: dirLockGuard,
// Value目录锁
valueDirGuard: valueDirLockGuard,
// Oracle的实例,一个KV引擎并发事务的管理器,负责分配事务的版本号,用来实现MVCC功能,在读写事务时详细展开
orc: newOracle(opt),
pub: newPublisher(),
allocPool: z.NewAllocatorPool(8),
bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
threshold: initVlogThreshold(&opt),
}
// Cleanup all the goroutines started by badger in case of an error.
// 关闭badger的所有任务协程的钩子函数
defer func() {
if err != nil {
opt.Errorf("Received err: %v. Cleaning up...", err)
db.cleanup()
db = nil
}
}()
// 块缓存相关配置
// LSM-T结构中SST里面数据是以块(block)为单位分割的
// 当开启块缓存之后,LSM-T会把最近被访问到的高热的块缓存在内存中,以加块响应速度
if opt.BlockCacheSize > 0 {
// 缓存不在此次源码阅读的讨论范围之内,不影响核心功能,暂且略过
// 值得一提的是badger是使用的缓存是badger社区研发的一个高性能本地并发缓存的库,有兴趣的同学可以自行研究
numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
if numInCache == 0 {
// Make the value of this variable at least one since the cache requires
// the number of counters to be greater than zero.
numInCache = 1
}
config := ristretto.Config{
NumCounters: numInCache * 8,
MaxCost: opt.BlockCacheSize,
BufferItems: 64,
Metrics: true,
OnExit: table.BlockEvictHandler,
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, y.Wrap(err, "failed to create data cache")
}
}
// 索引缓存相关配置
// 索引是每个Key所对应的偏离量的值,每一个SSTable有一个元数据块即索引块
// 可以方便对Key的二分查找,定位当前的key在哪一个sstable文件里,在文件中的偏移量是多少
if opt.IndexCacheSize > 0 {
// Index size is around 5% of the table size.
indexSz := int64(float64(opt.MemTableSize) * 0.05)
numInCache := opt.IndexCacheSize / indexSz
if numInCache == 0 {
// Make the value of this variable at least one since the cache requires
// the number of counters to be greater than zero.
numInCache = 1
}
config := ristretto.Config{
NumCounters: numInCache * 8,
MaxCost: opt.IndexCacheSize,
BufferItems: 64,
Metrics: true,
}
db.indexCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, y.Wrap(err, "failed to create bf cache")
}
}
// 对缓存模块的监控检测
db.closers.cacheHealth = z.NewCloser(1)
go db.monitorCache(db.closers.cacheHealth)
// 如果仅基于内存
if db.opt.InMemory {
// 默认关闭写同步
db.opt.SyncWrites = false
// If badger is running in memory mode, push everything into the LSM Tree.
// 把所有数据只写在LSM-T中
db.opt.ValueThreshold = math.MaxInt32
}
// Key的注册,与并发事务相关,之后再详细展开
krOpt := KeyRegistryOptions{
ReadOnly: opt.ReadOnly,
Dir: opt.Dir,
EncryptionKey: opt.EncryptionKey,
EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
InMemory: opt.InMemory,
}
db.registry, _ = OpenKeyRegistry(krOpt)
// 计算消耗的内存等数据统计信息
db.calculateSize()
db.closers.updateSize = z.NewCloser(1)
go db.updateSize(db.closers.updateSize)
// 打开一个memTable实例
// memtable是在内存中的一个复杂数据结构
if err := db.openMemTables(db.opt); err != nil {
return nil, y.Wrapf(err, "while opening memtables")
}
// 检查
if !db.opt.ReadOnly {
// 创建一个新的.mem文件
// .mem文件就是LSM-T中的预写日志文件(wal)
if db.mt, err = db.newMemTable(); err != nil {
return nil, y.Wrapf(err, "cannot create memtable")
}
}
// newLevelsController potentially loads files in directory.
// 创建内存中level管理器
// LSM-T是分层结构的, LevelsController实例负责维护整个层级结构
// 进行日志归并,压缩处理等操作,通过Manifest进行初始配置
// 或者是,manifest文件就是LevelController持久化之后的ondisk版本,可以加快badger的恢复重启速度
// 先打开SSTable,加载索引块,元数据块,缓存到内存当中
if db.lc, err = newLevelsController(db, &manifest); err != nil {
return db, err
}
// Initialize vlog struct.
// 初始化vlog
db.vlog.init(db)
if !opt.ReadOnly {
// 启动日志归并的工作协程,后续再展开
db.closers.compactors = z.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
db.closers.memtable = z.NewCloser(1)
go func() {
_ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}()
// Flush them to disk asap.
for _, mt := range db.imm {
db.flushChan <- flushTask{mt: mt}
}
}
// We do increment nextTxnTs below. So, no need to do it here.
// 拿到启动时最大事务的版本号(时间戳)
db.orc.nextTxnTs = db.MaxVersion()
db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
// 真正打开vlog文件
if err = db.vlog.open(db); err != nil {
return db, y.Wrapf(err, "During db.vlog.open")
}
// Let's advance nextTxnTs to one more than whatever we observed via
// replaying the logs.
// 事务相关,等待之前事务的恢复
db.orc.txnMark.Done(db.orc.nextTxnTs)
// In normal mode, we must update readMark so older versions of keys can be removed during
// compaction when run in offline mode via the flatten tool.
db.orc.readMark.Done(db.orc.nextTxnTs)
// 事务号自增
db.orc.incrementNextTs()
// 监听配置信息的更改
go db.threshold.listenForValueThresholdUpdate()
// 从数据库中检索被禁止的命名空间并更新内存结构(非重点)
if err := db.initBannedNamespaces(); err != nil {
return db, errors.Wrapf(err, "While setting banned keys")
}
// 启动处理磁盘写请求的协程
// badger的写任务是并发写任务,可以充分发挥ssd的性能
db.closers.writes = z.NewCloser(1)
go db.doWrites(db.closers.writes)
if !db.opt.InMemory {
// 真正开启vlog的GC, 后面再详细讲解
db.closers.valueGC = z.NewCloser(1)
go db.vlog.waitOnGC(db.closers.valueGC)
}
// 监听协程(非重点)
db.closers.pub = z.NewCloser(1)
go db.pub.listenForUpdates(db.closers.pub)
// 释放锁
valueDirLockGuard = nil
dirLockGuard = nil
manifestFile = nil
// 返回db
return db, nil
}
|