Featured image of post Badger源码导读(一) DB初始化

Badger源码导读(一) DB初始化

通读Badger源码,学习LSM-T结构存储引擎 - DB初始化

Badger源码导读

源码分析入口基准案例

先从Badger的基本使用入手

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func main() {
	// 打开db
	db, _ := badger.Open(badger.DefaultOptions("tmp/badger"))
	defer db.Close()

	// 读写事务
	err := db.Update(func(txn *badger.Txn) error {
		txn.Set([]byte("answer"), []byte("42"))
		txn.Get([]byte("answer"))
		return nil
	})

	// 只读事务
	err = db.View(func(txn *badger.Txn) error {
		txn.Get([]byte("answer_v1"))
		return nil
	})

	// 遍历keys
	err = db.View(func(txn *badger.Txn) error {
		opts := badger.DefaultIteratorOptions
		opts.PrefetchSize = 10
		it := txn.NewIterator(opts)
		defer it.Close()
		for it.Rewind(); it.Valid(); it.Next() {
			item := it.Item()
			k := item.Key()
			err := item.Value(func(val []byte) error {
				fmt.Printf("key=%s, value=%s\n", k, val)
				return nil
			})
			if err != nil {
				return err
			}
		}
		return nil
	})
	err = db.RunValueLogGC(0.7)
	_ = err
}

DB初始化过程

初始化参数

badger.open()传入的是一个option,先看一下option结构体的字段都有哪些

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
type Options struct {
    // Required options.

	// Dir: Badger是KV分离的存储引擎,Dir位置存储的是 Key 和指向Value的逻辑指针
	// ValueDir: 存储的是Value日志,即值所在的地址,默认情况下Dir和ValueDir在同一个path目录下
	Dir      string
	ValueDir string

	// Usually modified options.
    
    // SyncWrites: 同步写,即写入的时候主动同步到磁盘(mmap不会立即刷盘)
	SyncWrites        bool
	NumVersionsToKeep int
    // ReadOnly: 如其名,是否设置为只读
   	ReadOnly          bool
    // Logger: 如其名,log对象
   	Logger            Logger
    // Compression: 压缩归并的级别
   	Compression       options.CompressionType
    // InMemory: 是否只基于内存
   	InMemory          bool
   	MetricsEnabled    bool
   	// Sets the Stream.numGo field
   	NumGoroutines int

   	// Fine tuning options.

    // MemTableSize: 内存表的尺寸限制
	MemTableSize        int64
   	BaseTableSize       int64
   	BaseLevelSize       int64
   	LevelSizeMultiplier int
   	TableSizeMultiplier int
    // MaxLevels: 最大容忍的level级别,LSM-T的级数L0-L(max-1)
   	MaxLevels           int

   	VLogPercentile float64
    // ValueThreshold: 值大小的阈值,如果Value的大小不超过这个设定值,则不会将KV进行分离
    // 此处是在工业实践中的一种权衡,KV分离会造成不可避免的读放大
    // (两次的随机读,先在LSM-T中读取一次指针,再通过指针从ValueLog中读取一次值)
   	ValueThreshold int64
    // NumMemtables: 内存表的数量
   	NumMemtables   int
   	// Changing BlockSize across DB runs will not break badger. The block size is
   	// read from the block index stored at the end of the table.
    // BlockSize: 每个block的大小(sst由block和index等组成)
   	BlockSize          int
    // BloomFalsePositive: 布隆过滤器假阳性的比例
   	BloomFalsePositive float64
    // BlockCacheSize: 块缓存的大小
   	BlockCacheSize     int64
    // IndexCacheSize: 索引缓存的大小
   	IndexCacheSize     int64

   	NumLevelZeroTables      int
	NumLevelZeroTablesStall int

    // ValueLogFileSize: 存储值的Valuelog文件的最大大小
   	ValueLogFileSize   int64
    // ValueLogMaxEntries: 存储值的Valuelog文件的最大键值对数量
    ValueLogMaxEntries uint32

    // NumCompactors: 日志合并压缩协程同时运行的最大数量
   	NumCompactors        int
	CompactL0OnClose     bool
   	LmaxCompaction       bool
   	ZSTDCompressionLevel int

   	// When set, checksum will be validated for each entry read from the value log file.
    // VerifyValueChecksum: 是否进行参数校验值的检查
   	VerifyValueChecksum bool

   	// Encryption related options.
    // EncryptionKey: 加密字段 
   	EncryptionKey                 []byte        // encryption key
    // EncryptionKeyRotationDuration: 加密字段有效时长
   	EncryptionKeyRotationDuration time.Duration // key rotation duration

   	// BypassLockGuard will bypass the lock guard on badger. Bypassing lock
   	// guard can cause data corruption if multiple badger instances are using
   	// the same directory. Use this options with caution.
   	BypassLockGuard bool

   	// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
   	ChecksumVerificationMode options.ChecksumVerificationMode

	// DetectConflicts determines whether the transactions would be checked for
   	// conflicts. The transactions can be processed at a higher rate when
   	// conflict detection is disabled.
    // DetectConflicts: 事务的冲突检测 
   	DetectConflicts bool

   	// NamespaceOffset specifies the offset from where the next 8 bytes contains the namespace.
   	NamespaceOffset int

   	// Transaction start and commit timestamps are managed by end-user.
   	// This is only useful for databases built on top of Badger (like Dgraph).
   	// Not recommended for most users.
   	managedTxns bool

   	// 4. Flags for testing purposes
   	// ------------------------------
    // 有关批处理的参数
   	maxBatchCount int64 // max entries in batch
   	maxBatchSize  int64 // max batch size in bytes

	maxValueThreshold float64
}

传入指定的路径,并默认配置信息,如果有需要更改的信息可以使用 WithX() 方法(此处使用了建造者模式

badger.DefaultOptions("tmp/badger")

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// DefaultOptions sets a list of recommended options for good performance.
// Feel free to modify these to suit your needs with the WithX methods.
func DefaultOptions(path string) Options {
	return Options{

		Dir:      path,
		ValueDir: path,

		MemTableSize:        64 << 20,
		BaseTableSize:       2 << 20,
		BaseLevelSize:       10 << 20,
		TableSizeMultiplier: 2,
		LevelSizeMultiplier: 10,
		MaxLevels:           7,
		NumGoroutines:       8,
		MetricsEnabled:      true,

		NumCompactors:           4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
		NumLevelZeroTables:      5,
		NumLevelZeroTablesStall: 15,
		NumMemtables:            5,
		BloomFalsePositive:      0.01,
		BlockSize:               4 * 1024,
		SyncWrites:              false,
		NumVersionsToKeep:       1,
		CompactL0OnClose:        false,
		VerifyValueChecksum:     false,
		Compression:             options.Snappy,
		BlockCacheSize:          256 << 20,
		IndexCacheSize:          0,

		// The following benchmarks were done on a 4 KB block size (default block size). The
		// compression is ratio supposed to increase with increasing compression level but since the
		// input for compression algorithm is small (4 KB), we don't get significant benefit at
		// level 3.
		// NOTE: The benchmarks are with DataDog ZSTD that requires CGO. Hence, no longer valid.
		// no_compression-16              10	 502848865 ns/op	 165.46 MB/s	-
		// zstd_compression/level_1-16     7	 739037966 ns/op	 112.58 MB/s	2.93
		// zstd_compression/level_3-16     7	 756950250 ns/op	 109.91 MB/s	2.72
		// zstd_compression/level_15-16    1	11135686219 ns/op	   7.47 MB/s	4.38
		// Benchmark code can be found in table/builder_test.go file
		ZSTDCompressionLevel: 1,

		// Nothing to read/write value log using standard File I/O
		// MemoryMap to mmap() the value log files
		// (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32.
		// -1 so 2*ValueLogFileSize won't overflow on 32-bit systems.
		ValueLogFileSize: 1<<30 - 1,

		ValueLogMaxEntries: 1000000,

		VLogPercentile: 0.0,
		ValueThreshold: maxValueThreshold,

		Logger:                        defaultLogger(INFO),
		EncryptionKey:                 []byte{},
		EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
		DetectConflicts:               true,
		NamespaceOffset:               -1,
	}
}

Open函数(核心)

badger.Open(opt) 函数

此方法代码过长,在此只保留核心部分代码,部分逻辑将以伪代码或注释表示,并省去部分错误处理逻辑

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
func Open(opt Options) (*DB, error) {
    
    // 检查参数
	checkAndSetOptions(&opt)
    
    // 创建了三个目录锁,防止其他进程注册到同一个目录造成冲突
	var dirLockGuard, valueDirLockGuard *directoryLockGuard

	// Create directories and acquire lock on it only if badger is not running in InMemory mode.
	// We don't have any directories/files in InMemory mode so we don't need to acquire
	// any locks on them.
    // 判断参数配置为只基于内存
	if !opt.InMemory {
        // 创建目录
		createDirs(opt)

		var err error
		if !opt.BypassLockGuard {
            // 给Dir加目录锁
			dirLockGuard, _ = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
			// 方法末尾释放锁
			defer func() {
				if dirLockGuard != nil {
					_ = dirLockGuard.release()
				}
			}()
            // 获取Key&ValuePtr的绝对路径
			absDir, _ := filepath.Abs(opt.Dir)

            // 获取ValueLog的绝对路径
			absValueDir, _ := filepath.Abs(opt.ValueDir)
			
            // 如果ValueDir和Dir不相同,需要各自加锁
			if absValueDir != absDir {
            	// 给ValueDir加目录锁
				valueDirLockGuard, _ = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)

                // 释放锁
				defer func() {
					if valueDirLockGuard != nil {
						_ = valueDirLockGuard.release()
					}
				}()
			}
		}
	}

    // 打开或创建Manifest文件,(采用mmap方式打开,在后面详细展开)
	manifestFile, manifest, _ := openOrCreateManifestFile(opt)
    
	// 关闭Manifest文件
	defer func() {
		if manifestFile != nil {
			_ = manifestFile.close()
		}
	}()

    // 创建内存中的db数据结构
	db := &DB{
        // memtable, 因为有多个,所以要创建数组
		imm:              make([]*memTable, 0, opt.NumMemtables),
        // 刷新请求的channel
		flushChan:        make(chan flushTask, opt.NumMemtables),
		// 写请求的channel
        writeCh:          make(chan *request, kvWriteChCapacity),
        // 配置信息opt
		opt:              opt,
        // 刚初始化好的manifest实例
        manifest:         manifestFile,
        // Key&ValuePtr目录锁
		dirLockGuard:     dirLockGuard,
		// Value目录锁
        valueDirGuard:    valueDirLockGuard,
        // Oracle的实例,一个KV引擎并发事务的管理器,负责分配事务的版本号,用来实现MVCC功能,在读写事务时详细展开
		orc:              newOracle(opt),
		pub:              newPublisher(),
		allocPool:        z.NewAllocatorPool(8),
		bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
		threshold:        initVlogThreshold(&opt),
	}
	// Cleanup all the goroutines started by badger in case of an error.
    // 关闭badger的所有任务协程的钩子函数
	defer func() {
		if err != nil {
			opt.Errorf("Received err: %v. Cleaning up...", err)
			db.cleanup()
			db = nil
		}
	}()
	
    // 块缓存相关配置
    // LSM-T结构中SST里面数据是以块(block)为单位分割的
    // 当开启块缓存之后,LSM-T会把最近被访问到的高热的块缓存在内存中,以加块响应速度
	if opt.BlockCacheSize > 0 {
        // 缓存不在此次源码阅读的讨论范围之内,不影响核心功能,暂且略过
        // 值得一提的是badger是使用的缓存是badger社区研发的一个高性能本地并发缓存的库,有兴趣的同学可以自行研究
		numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
		if numInCache == 0 {
			// Make the value of this variable at least one since the cache requires
			// the number of counters to be greater than zero.
			numInCache = 1
		}

		config := ristretto.Config{
			NumCounters: numInCache * 8,
			MaxCost:     opt.BlockCacheSize,
			BufferItems: 64,
			Metrics:     true,
			OnExit:      table.BlockEvictHandler,
		}
		db.blockCache, err = ristretto.NewCache(&config)
		if err != nil {
			return nil, y.Wrap(err, "failed to create data cache")
		}
	}
    
	// 索引缓存相关配置
    // 索引是每个Key所对应的偏离量的值,每一个SSTable有一个元数据块即索引块
    // 可以方便对Key的二分查找,定位当前的key在哪一个sstable文件里,在文件中的偏移量是多少
	if opt.IndexCacheSize > 0 {
		// Index size is around 5% of the table size.
		indexSz := int64(float64(opt.MemTableSize) * 0.05)
		numInCache := opt.IndexCacheSize / indexSz
		if numInCache == 0 {
			// Make the value of this variable at least one since the cache requires
			// the number of counters to be greater than zero.
			numInCache = 1
		}

		config := ristretto.Config{
			NumCounters: numInCache * 8,
			MaxCost:     opt.IndexCacheSize,
			BufferItems: 64,
			Metrics:     true,
		}
		db.indexCache, err = ristretto.NewCache(&config)
		if err != nil {
			return nil, y.Wrap(err, "failed to create bf cache")
		}
	}

    // 对缓存模块的监控检测
	db.closers.cacheHealth = z.NewCloser(1)
	go db.monitorCache(db.closers.cacheHealth)
	
    // 如果仅基于内存
	if db.opt.InMemory {
        // 默认关闭写同步
		db.opt.SyncWrites = false
		// If badger is running in memory mode, push everything into the LSM Tree.
        // 把所有数据只写在LSM-T中
		db.opt.ValueThreshold = math.MaxInt32
	}
    
    // Key的注册,与并发事务相关,之后再详细展开
	krOpt := KeyRegistryOptions{
		ReadOnly:                      opt.ReadOnly,
		Dir:                           opt.Dir,
		EncryptionKey:                 opt.EncryptionKey,
		EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
		InMemory:                      opt.InMemory,
	}
	db.registry, _ = OpenKeyRegistry(krOpt)
	
    // 计算消耗的内存等数据统计信息
	db.calculateSize()
	db.closers.updateSize = z.NewCloser(1)
	go db.updateSize(db.closers.updateSize)

    // 打开一个memTable实例
    // memtable是在内存中的一个复杂数据结构
	if err := db.openMemTables(db.opt); err != nil {
		return nil, y.Wrapf(err, "while opening memtables")
	}
	// 检查
	if !db.opt.ReadOnly {
        // 创建一个新的.mem文件
        // .mem文件就是LSM-T中的预写日志文件(wal)
		if db.mt, err = db.newMemTable(); err != nil {
			return nil, y.Wrapf(err, "cannot create memtable")
		}
	}

	// newLevelsController potentially loads files in directory.
    // 创建内存中level管理器
    // LSM-T是分层结构的, LevelsController实例负责维护整个层级结构
    // 进行日志归并,压缩处理等操作,通过Manifest进行初始配置
    // 或者是,manifest文件就是LevelController持久化之后的ondisk版本,可以加快badger的恢复重启速度
    // 先打开SSTable,加载索引块,元数据块,缓存到内存当中
	if db.lc, err = newLevelsController(db, &manifest); err != nil {
		return db, err
	}

	// Initialize vlog struct.
    // 初始化vlog
	db.vlog.init(db)

	if !opt.ReadOnly {
        // 启动日志归并的工作协程,后续再展开
		db.closers.compactors = z.NewCloser(1)
		db.lc.startCompact(db.closers.compactors)

		db.closers.memtable = z.NewCloser(1)
		go func() {
			_ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
		}()
		// Flush them to disk asap.
		for _, mt := range db.imm {
			db.flushChan <- flushTask{mt: mt}
		}
	}
	// We do increment nextTxnTs below. So, no need to do it here.
    // 拿到启动时最大事务的版本号(时间戳)
	db.orc.nextTxnTs = db.MaxVersion()
	db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)

    // 真正打开vlog文件
	if err = db.vlog.open(db); err != nil {
		return db, y.Wrapf(err, "During db.vlog.open")
	}

	// Let's advance nextTxnTs to one more than whatever we observed via
	// replaying the logs.
    // 事务相关,等待之前事务的恢复
	db.orc.txnMark.Done(db.orc.nextTxnTs)
	// In normal mode, we must update readMark so older versions of keys can be removed during
	// compaction when run in offline mode via the flatten tool.
	db.orc.readMark.Done(db.orc.nextTxnTs)
    // 事务号自增
	db.orc.incrementNextTs()
	
    // 监听配置信息的更改
	go db.threshold.listenForValueThresholdUpdate()

    // 从数据库中检索被禁止的命名空间并更新内存结构(非重点)
	if err := db.initBannedNamespaces(); err != nil {
		return db, errors.Wrapf(err, "While setting banned keys")
	}
	
    // 启动处理磁盘写请求的协程
    // badger的写任务是并发写任务,可以充分发挥ssd的性能
	db.closers.writes = z.NewCloser(1)
	go db.doWrites(db.closers.writes)

	if !db.opt.InMemory {
        // 真正开启vlog的GC, 后面再详细讲解
		db.closers.valueGC = z.NewCloser(1)
		go db.vlog.waitOnGC(db.closers.valueGC)
	}

    // 监听协程(非重点)
	db.closers.pub = z.NewCloser(1)
	go db.pub.listenForUpdates(db.closers.pub)

    // 释放锁
	valueDirLockGuard = nil
	dirLockGuard = nil
	manifestFile = nil
    // 返回db
	return db, nil
}

创建Manifest文件

openOrCreateManifestFile(opt) 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func openOrCreateManifestFile(opt Options) (ret *manifestFile, result Manifest, err error) {
    // 如果Inmemory则返回空的Manifest
	if opt.InMemory {
		return &manifestFile{inMemory: true}, Manifest{}, nil
	}
	return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, manifestDeletionsRewriteThreshold)
}


func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold int) (*manifestFile, Manifest, error) {
	// 拼接path
	path := filepath.Join(dir, ManifestFilename)
	var flags y.Flags
	if readOnly {
		flags |= y.ReadOnly
	}
    // 尝试打开文件
	fp, err := y.OpenExistingFile(path, flags) // We explicitly sync in addChanges, outside the lock.
	if err != nil {
        // 校验文件是否存在
		if !os.IsNotExist(err) {
			return nil, Manifest{}, err
		}
        // 如果仅读则无法创建直接返回
		if readOnly {
			return nil, Manifest{}, fmt.Errorf("no manifest found, required for read-only db")
		}
        // 真正创建manifest实例
		m := createManifest()
        // 覆盖写,执行完此条语句后就可以在目录中看到MANIFEST文件存在了(此时MANIFEST文件中仅有魔数bdg)
		fp, netCreations, _ := helpRewrite(dir, &m)
		
        // 断言,确保创建成功
		y.AssertTrue(netCreations == 0)
        // 创建manifestFile实例在内存中保存信息
		mf := &manifestFile{
			fp:                        fp,
			directory:                 dir,
			manifest:                  m.clone(),
			deletionsRewriteThreshold: deletionsThreshold,
		}
		return mf, m, nil
	}
	
    // 文件存在加载恢复的逻辑暂不展开
    ......
}

func createManifest() Manifest {
	levels := make([]levelManifest, 0)
	return Manifest{
		Levels: levels,
		Tables: make(map[uint64]TableManifest),
	}
    // Tables: map[uint64]TableManifest
    // uint64: 行号,第n个level
}

打开Memtable

memtable 结构体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
// both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
// its pre-crash form.
type memTable struct {
	sl         *skl.Skiplist
	wal        *logFile
	maxVersion uint64
	opt        Options
	buf        *bytes.Buffer
}

openMemTables(opt) 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (db *DB) openMemTables(opt Options) error {
	// We don't need to open any tables in in-memory mode.
    // 如果是只基于内存则直接返回(那我走?)
	if db.opt.InMemory {
		return nil
	}
    // 读取目录中的全部文件
	files, _ := ioutil.ReadDir(db.opt.Dir)

	var fids []int
    // 遍历目录中的文件
	for _, file := range files {
        // 检查当前文件名是否包含一个.mem的后缀(在第一次初始化过程中肯定不会存在)
        // 此时目录中应有的文件为 LOCK MANIFEST KEYREGISTRY
		if !strings.HasSuffix(file.Name(), memFileExt) {
			continue
		}
        // 如果有.mem文件,则取文件的命名转为int值作为fid
        // 例: 000001.mem 000002.mem
		fsz := len(file.Name())
		fid, _ := strconv.ParseInt(file.Name()[:fsz-len(memFileExt)], 10, 64)

		fids = append(fids, int(fid))
	}

	// Sort in ascending order.
    // 按照fid排序
	sort.Slice(fids, func(i, j int) bool {
		return fids[i] < fids[j]
	})
    // 按照fid顺序遍历
	for _, fid := range fids {
		flags := os.O_RDWR
		if db.opt.ReadOnly {
			flags = os.O_RDONLY
		}
        // 真正的打开.mem文件,采用mmap方式加载.mem文件中的数据
		mt, err := db.openMemTable(fid, flags)
		if err != nil {
			return y.Wrapf(err, "while opening fid: %d", fid)
		}
		// If this memtable is empty we don't need to add it. This is a
		// memtable that was completely truncated.
		if mt.sl.Empty() {
			mt.DecrRef()
			continue
		}
		// These should no longer be written to. So, make them part of the imm.
		db.imm = append(db.imm, mt)
	}
    // 设置最新的fid序列号
	if len(fids) != 0 {
		db.nextMemFid = fids[len(fids)-1]
	}
	db.nextMemFid++
	return nil
}

创建Memtable

newMemTable() 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (db *DB) newMemTable() (*memTable, error) {
    // 真正创建.mem文件
	mt, err := db.openMemTable(db.nextMemFid, os.O_CREATE|os.O_RDWR)
	if err == z.NewFile {
		db.nextMemFid++
		return mt, nil
	}

	if err != nil {
		db.opt.Errorf("Got error: %v for id: %d\n", err, db.nextMemFid)
		return nil, y.Wrapf(err, "newMemTable")
	}
	return nil, errors.Errorf("File %s already exists", mt.wal.Fd.Name())
}

openMemTable(fid, flags) 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (db *DB) openMemTable(fid, flags int) (*memTable, error) {
	// 拼接路径
    filepath := db.mtFilePath(fid)
	// 创建memtable中的skiplist
    s := skl.NewSkiplist(arenaSize(db.opt))
    // 创建memtable实例
	mt := &memTable{
		sl:  s,
		opt: db.opt,
		buf: &bytes.Buffer{},
	}
	// We don't need to create the wal for the skiplist in in-memory mode so return the mt.
    // 如果只基于内存,则不需要创建wal文件,直接返回
	if db.opt.InMemory {
		return mt, z.NewFile
	}
	// 创建wal文件实例
	mt.wal = &logFile{
		fid:      uint32(fid),
		path:     filepath,
		registry: db.registry,
		writeAt:  vlogHeaderSize,
		opt:      db.opt,
	}
    // 调用系统函数创建wal文件
	lerr := mt.wal.open(filepath, flags, 2*db.opt.MemTableSize)
    // 如果未成功创建新文件或其他失败则返回err
	if lerr != z.NewFile && lerr != nil {
		return nil, y.Wrapf(lerr, "While opening memtable: %s", filepath)
	}

	// Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
	// when it gets flushed to L0.
    // 用来关闭的回调函数
	s.OnClose = func() {
		if err := mt.wal.Delete(); err != nil {
			db.opt.Errorf("while deleting file: %s, err: %v", filepath, err)
		}
	}
    // 成功创建mmap则返回 lerr (z.NewFile)
	if lerr == z.NewFile {
		return mt, lerr
	}
    // 当且仅当MemTableSize设置为0时造成 lerr == nil的适合执行到此
    // 此时mmap未进行截断,在UpdateSkipList()中遍历wal文件并重新截断,如果wal文件不存在会返回错误
	err := mt.UpdateSkipList()
	return mt, y.Wrapf(err, "while updating skiplist")
}

创建levelController

newLevelsController(db, mf) 函数

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
    // 断言,进行一些必要的校验
	y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
    // 关联db实例,创建level数组对应层级关系(例:levels[0] => L0层)
    // levelHandler就是真正负责某一层sst管理器的主要操作
	s := &levelsController{
		kv:     db,
		levels: make([]*levelHandler, db.opt.MaxLevels),
	}
    // 状态统计的一个的对象(set结构),key为fid,用以判断对应的fid是否存在于这一层
	s.cstatus.tables = make(map[uint64]struct{})
    // 合并状态的信息
	s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
	
    // 按层遍历,每一层都创建一个levelhandler实例
	for i := 0; i < db.opt.MaxLevels; i++ {
		s.levels[i] = newLevelHandler(db, i)
		s.cstatus.levels[i] = new(levelCompactStatus)
	}
	// 基于内存,那我走?🤡
	if db.opt.InMemory {
		return s, nil
	}
	// Compare manifest against directory, check for existent/non-existent files, and remove.
    // 对manifest文件进行校验
	if err := revertToManifest(db, mf, getIDMap(db.opt.Dir)); err != nil {
		return nil, err
	}

	var mu sync.Mutex
	tables := make([][]*table.Table, db.opt.MaxLevels)
	var maxFileID uint64

	// We found that using 3 goroutines allows disk throughput to be utilized to its max.
	// Disk utilization is the main thing we should focus on, while trying to read the data. That's
	// the one factor that remains constant between HDD and SSD.
    // 一种针对并发控制的负载均衡策略,对于ssd来说,创建3个协程能够最大的发挥ssd的优点
	throttle := y.NewThrottle(3)

	start := time.Now()
	var numOpened int32
    // 创建一个定时触发器进行超时控制
	tick := time.NewTicker(3 * time.Second)
    // 钩子函数关闭定时器
	defer tick.Stop()
	
    // manifest清单文件的Tables
    // 拿到每个table对应的fid
    // 第一次初始化的适合因为Tables为空,会直接跳过
	for fileID, tf := range mf.Tables {
		fname := table.NewFilename(fileID, db.opt.Dir)
		select {
		case <-tick.C:
			db.opt.Infof("%d tables out of %d opened in %s\n", atomic.LoadInt32(&numOpened),
				len(mf.Tables), time.Since(start).Round(time.Millisecond))
		default:
		}
		if err := throttle.Do(); err != nil {
			closeAllTables(tables)
			return nil, err
		}
		if fileID > maxFileID {
			maxFileID = fileID
		}
		go func(fname string, tf TableManifest) {
			var rerr error
			defer func() {
				throttle.Done(rerr)
				atomic.AddInt32(&numOpened, 1)
			}()
			dk, err := db.registry.DataKey(tf.KeyID)
			if err != nil {
				rerr = y.Wrapf(err, "Error while reading datakey")
				return
			}
			topt := buildTableOptions(db)
			// Explicitly set Compression and DataKey based on how the table was generated.
			topt.Compression = tf.Compression
			topt.DataKey = dk

			mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
			if err != nil {
				rerr = y.Wrapf(err, "Opening file: %q", fname)
				return
			}
			t, err := table.OpenTable(mf, topt)
			if err != nil {
				if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
					db.opt.Errorf(err.Error())
					db.opt.Errorf("Ignoring table %s", mf.Fd.Name())
					// Do not set rerr. We will continue without this table.
				} else {
					rerr = y.Wrapf(err, "Opening table: %q", fname)
				}
				return
			}

			mu.Lock()
			tables[tf.Level] = append(tables[tf.Level], t)
			mu.Unlock()
		}(fname, tf)
	}
    // 关闭相关的任务协程
	if err := throttle.Finish(); err != nil {
		closeAllTables(tables)
		return nil, err
	}
	db.opt.Infof("All %d tables opened in %s\n", atomic.LoadInt32(&numOpened),
		time.Since(start).Round(time.Millisecond))
    // 记录当前fid最大值
	s.nextFileID = maxFileID + 1
    // 初始化每个level的tables
	for i, tbls := range tables {
		s.levels[i].initTables(tbls)
	}

	// Make sure key ranges do not overlap etc.
    // 必要的数据校验
	if err := s.validate(); err != nil {
		_ = s.cleanupLevels()
		return nil, y.Wrap(err, "Level validation")
	}

	// Sync directory (because we have at least removed some files, or previously created the
	// manifest file).
    // 手动进行同步刷盘
	if err := syncDir(db.opt.Dir); err != nil {
		_ = s.close()
		return nil, err
	}

	return s, nil
}
创建levelHandler

newLevelHandler(db, level) 函数

1
2
3
4
5
6
7
func newLevelHandler(db *DB, level int) *levelHandler {
	return &levelHandler{
		level:    level,
		strLevel: fmt.Sprintf("l%d", level),
		db:       db,
	}
}
初始化tables

initTables(tables) 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// initTables replaces s.tables with given tables. This is done during loading.
func (s *levelHandler) initTables(tables []*table.Table) {
    // 加锁
	s.Lock()
	defer s.Unlock()
	
    // 赋值与相关值的初始化
	s.tables = tables
	s.totalSize = 0
	s.totalStaleSize = 0
	for _, t := range tables {
		s.addSize(t)
	}
	// 如果是L0层,需要拿每个fid排序
	if s.level == 0 {
		// Key range will overlap. Just sort by fileID in ascending order
		// because newer tables are at the end of level 0.
		sort.Slice(s.tables, func(i, j int) bool {
			return s.tables[i].ID() < s.tables[j].ID()
		})
	} else {
        // L0层往上,拿每个table文件的MinKey排序
		// Sort tables by keys.
		sort.Slice(s.tables, func(i, j int) bool {
			return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
		})
	}
}

初始化vlog

init(db) 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// init initializes the value log struct. This initialization needs to happen
// before compactions start.
func (vlog *valueLog) init(db *DB) {
    // 加载配置
	vlog.opt = db.opt
	vlog.db = db
	// We don't need to open any vlog files or collect stats for GC if DB is opened
	// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
    
    // inmem,那我走?🤡
	if vlog.opt.InMemory {
		return
	}
    // 指定的vlog目录
	vlog.dirPath = vlog.opt.ValueDir
	// GC模块用到的channel
	vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
    // 创建一个GC模块相关文件
	lf, err := InitDiscardStats(vlog.opt)
	y.Check(err)
	vlog.discardStats = lf
}

打开vlog

open(db) 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
func (vlog *valueLog) open(db *DB) error {
	// We don't need to open any vlog files or collect stats for GC if DB is opened
	// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
    // 不想再做解释了,inmem,那我走!!!
	if db.opt.InMemory {
		return nil
	}
	// 填充文件fid到filesMap
	if err := vlog.populateFilesMap(); err != nil {
		return err
	}
	// If no files are found, then create a new file.
    // 如果没有.vlog文件
	if len(vlog.filesMap) == 0 {
		if vlog.opt.ReadOnly {
			return nil
		}
        // 创建一个.vlog文件
		_, err := vlog.createVlogFile()
		return y.Wrapf(err, "Error while creating log file in valueLog.open")
	}
	fids := vlog.sortedFids()
	for _, fid := range fids {
		lf, ok := vlog.filesMap[fid]
		y.AssertTrue(ok)

		// Just open in RDWR mode. This should not create a new log file.
		lf.opt = vlog.opt
		if err := lf.open(vlog.fpath(fid), os.O_RDWR,
			2*vlog.opt.ValueLogFileSize); err != nil {
			return y.Wrapf(err, "Open existing file: %q", lf.path)
		}
		// We shouldn't delete the maxFid file.
		if lf.size == vlogHeaderSize && fid != vlog.maxFid {
			vlog.opt.Infof("Deleting empty file: %s", lf.path)
			if err := lf.Delete(); err != nil {
				return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
			}
			delete(vlog.filesMap, fid)
		}
	}

	if vlog.opt.ReadOnly {
		return nil
	}
	// Now we can read the latest value log file, and see if it needs truncation. We could
	// technically do this over all the value log files, but that would mean slowing down the value
	// log open.
	last, ok := vlog.filesMap[vlog.maxFid]
	y.AssertTrue(ok)
	lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
		func(_ Entry, vp valuePointer) error {
			return nil
		})
	if err != nil {
		return y.Wrapf(err, "while iterating over: %s", last.path)
	}
	if err := last.Truncate(int64(lastOff)); err != nil {
		return y.Wrapf(err, "while truncating last value log file: %s", last.path)
	}

	// Don't write to the old log file. Always create a new one.
	if _, err := vlog.createVlogFile(); err != nil {
		return y.Wrapf(err, "Error while creating log file in valueLog.open")
	}
	return nil
}

populateFilesMap() 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (vlog *valueLog) populateFilesMap() error {
	vlog.filesMap = make(map[uint32]*logFile)

    // 从目录中拿到每个文件的句柄
	files, _ := ioutil.ReadDir(vlog.dirPath)

	found := make(map[uint64]struct{})
	for _, file := range files {
        // 判断是否以.vlog作为后缀
		if !strings.HasSuffix(file.Name(), ".vlog") {
			continue
		}
        // 对.vlog文件进行校验,去除fid,进行消重判断
		fsz := len(file.Name())
		fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
		if err != nil {
			return errFile(err, file.Name(), "Unable to parse log id.")
		}
		if _, ok := found[fid]; ok {
			return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
		}
		found[fid] = struct{}{}

		lf := &logFile{
			fid:      uint32(fid),
			path:     vlog.fpath(uint32(fid)),
			registry: vlog.db.registry,
		}
        // 最后保存到vlog的filesMap当中
		vlog.filesMap[uint32(fid)] = lf
		if vlog.maxFid < uint32(fid) {
			vlog.maxFid = uint32(fid)
		}
	}
    // 直到每个.vlog文件的fid都添加到了map中
    // 第一次初始化时没有.vlog文件,故直接跳过
	return nil
}
创建vlog文件

createVlogFile() 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (vlog *valueLog) createVlogFile() (*logFile, error) {
    // 最大的fid
	fid := vlog.maxFid + 1
    // 根据fid命名
	path := vlog.fpath(fid)
    // 创建一个句柄实例
	lf := &logFile{
		fid:      fid,
		path:     path,
		registry: vlog.db.registry,
		writeAt:  vlogHeaderSize,
		opt:      vlog.opt,
	}
    // 进行系统调用打开文件,通过mmap的方式
    // .vlog文件初始化时会创建一个2G的文件
	err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
	if err != z.NewFile && err != nil {
		return nil, err
	}
	
    // 进行数据初始化更新的操作
	vlog.filesLock.Lock()
	vlog.filesMap[fid] = lf
	y.AssertTrue(vlog.maxFid < fid)
	vlog.maxFid = fid
	// writableLogOffset is only written by write func, by read by Read func.
	// To avoid a race condition, all reads and updates to this variable must be
	// done via atomics.
	atomic.StoreUint32(&vlog.writableLogOffset, vlogHeaderSize)
	vlog.numEntriesWritten = 0
	vlog.filesLock.Unlock()

	return lf, nil
}

小结

通过本章,我们走上了Badger源码路上的第一步,了解了一个LSM-T结构的存储引擎初始化时都要做哪些准备

Built with Hugo
主题 StackJimmy 设计