服务计算 - 6 BoltDB学习与简单剖析

BoltDB 介绍

Bolt is a pure Go key/value store inspired by Howard Chu’s LMDB project. The goal of the project is to provide a simple, fast, and reliable database for projects that don’t require a full database server such as Postgres or MySQL.
Since Bolt is meant to be used as such a low-level piece of functionality, simplicity is key. The API will be small and only focus on getting values and setting values. That’s it.

BoltDB是一个嵌入式key/value的数据库,即只需要将其链接到你的应用程序代码中即可使用BoltDB提供的API来高效的存取数据。而且BoltDB支持完全可序列化的ACID事务,让应用程序可以更简单的处理复杂操作

BoltDB 使用

BoltDB使用Golang开发

安装

1
$ go get github.com/boltdb/bolt

创建和启动数据库

使用bolt.Open()函数进行创建或者打开数据库

1
db, err := bolt.Open("data.db", 0600, nil)

该命令将打开当前目录下data.db文件,如果该文件不存在,则将被创建

Open方法三个参数

  • 第一个参数为路径,指定需要打开的数据库,不存在将会被创建
  • 第二个参数为文件操作权限
  • 第三个参数为可选参数,内部可以配置只读和超时等

BoltDB是文件操作类型的数据库,打开数据库后将有一个文件锁,不允许多个进程同时打开同一个数据库。如果多个同时操作的话后者会被挂起直到前者关闭操作为止,为了避免无限等待,在打开数据库的时候可以配置超时时间

1
db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})

读写事务

对于读写事务,可以使用DB.Update来完成

1
2
3
4
err := db.Update(func(tx *bolt.Tx) error {
...
return nil
})

DB.Update方法可以对数据库进行读写操作。在闭包内部,可以获得数据库的一致视图。 在结尾返回nil来提交事务,也可以通过返回错误随时回滚事务。 读写事务中允许所有数据库操作,始终检查返回错误,因为它将报告可能导致您的事务无法完成的任何磁盘故障。 如果您在关闭中返回错误,它将被传递。

  • Example
    需要注意在读写事务以及只读事务中,如果没有相应的Bucket,是不能直接对其进行操作的,必须先创建

    1
    2
    3
    4
    5
    // create bucket
    db.Update(func(tx *bolt.Tx) error {
    _, err := tx.CreateBucketIfNotExists([]byte(peopleBucket))
    CheckErr(err)
    });

    peopleBucket桶中插入一个People,其key值为其ID,value为People的序列化后的字符串

    1
    2
    3
    4
    5
    6
    7
    8
    9
    it = model.People{}
    it.ID = "1"
    db.Update(func(tx *bolt.Tx) error {
    b := tx.Bucket([]byte(peopleBucket))
    jsons, errs := json.Marshal(it)
    CheckErr(errs)
    err := b.Put([]byte(it.ID), jsons)
    return err
    })

只读事务

对于只读事务,使用DB.View来完成

1
2
3
4
err := db.View(func(tx *bolt.Tx) error {
...
return nil
})

在此闭包中获得数据库的一致视图,但是,只读事务中不允许进行变更操作。 只能在只读事务中检索存储桶,检索值和复制数据库。需要注意的是读写事务和只读事务不应该在同一个goroutine里同时打开,这是由于读写事务需要周期性重新映射数据文件,与只读事务发生冲突,可能造成死锁。BoltDB一次只允许一个读写事务,单一次允许多个只读事务,保持数据一致性。

  • Example
    获取peopleBucket桶中key值为"1"value
    1
    2
    3
    4
    5
    6
    db.View(func(tx *bolt.Tx) error {
    b := tx.Bucket([]byte(peopleBucket))
    v := b.Get([]byte("1"))
    fmt.Printf("%s\n", v)
    return nil
    })

批量更新事务

每个DB.Update()等待磁盘提交写入。 通过将多个更新与DB.Batch()函数组合,可以最大限度地减少此开销

1
2
3
4
err := db.Batch(func(tx *bolt.Tx) error {
...
return nil
})

在整个批处理的过程中,如果某个事务失败了,批处理将会多次调用给定的函数。函数只有在DB.Batch返回成功才表示成功完成。如果批处理中途失败了,整个事务将会回滚。

更多

更多详细的用法请参考 boltdb/bolt

BoltDB 源码简单剖析

关于源码部分,纯属一些个人的理解以及网上相关一些讲解,对于某些部分可能理解不是很到位,还望多多指正,源码部分设计到比较底层具体文件存储部分感觉还是很难理解的,此部分只是简单的摸索一下其中的机制与一些方法的执行过程

首先先探索一下一开始启动数据库中其是设置文件锁

1
2
3
4
5
6
7
8
9
10
11
// Lock file so that other processes using Bolt in read-write mode cannot
// use the database at the same time. This would cause corruption since
// the two processes would write meta pages and free pages separately.
// The database file is locked exclusively (only one process can grab the lock)
// if !options.ReadOnly.
// The database file is locked using the shared lock (more than one process may
// hold a lock at the same time) otherwise (options.ReadOnly is set).
if err := flock(db, mode, !db.readOnly, options.Timeout); err != nil {
_ = db.close()
return nil, err
}

呃其实这部分注释已经解释的很明确了,但是还是看看这个flock函数吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// flock acquires an advisory lock on a file descriptor.
func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error {
var t time.Time
for {
// If we're beyond our timeout then return an error.
// This can only occur after we've attempted a flock once.
if t.IsZero() {
t = time.Now()
} else if timeout > 0 && time.Since(t) > timeout {
return ErrTimeout
}
flag := syscall.LOCK_SH
if exclusive {
flag = syscall.LOCK_EX
}

// Otherwise attempt to obtain an exclusive lock.
err := syscall.Flock(int(db.file.Fd()), flag|syscall.LOCK_NB)
if err == nil {
return nil
} else if err != syscall.EWOULDBLOCK {
return err
}

// Wait for a bit and try again.
time.Sleep(50 * time.Millisecond)
}
}

原来其一开始先定义一个变量,用一个死循环在运行这段代码,如果在设置的timeout时间段内仍然得不到这个文件的锁,则将返回一个超时错误,如果没有设置timeout,则其将一直等待,直到获得该文件的锁,即获得该文件的使用权,其实这与操作系统中的进程同步异步使用一个互斥锁的机制是相似的。接着再来看看其对于未创建过的数据库是如何创建的以及如何对其初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Initialize the database if it doesn't exist.
if info, err := db.file.Stat(); err != nil {
return nil, err
} else if info.Size() == 0 {
// Initialize new files with meta pages.
if err := db.init(); err != nil {
return nil, err
}
} else {
// Read the first meta page to determine the page size.
var buf [0x1000]byte
if _, err := db.file.ReadAt(buf[:], 0); err == nil {
m := db.pageInBuffer(buf[:], 0).meta()
if err := m.validate(); err != nil {
// If we can't read the page size, we can assume it's the same
// as the OS -- since that's how the page size was chosen in the
// first place.
//
// If the first page is invalid and this OS uses a different
// page size than what the database was created with then we
// are out of luck and cannot access the database.
db.pageSize = os.Getpagesize()
} else {
db.pageSize = int(m.pageSize)
}
}
}

其首先先对文件是否存在以及文件的类型进行判断,调用db.file.Stat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Stat returns the FileInfo structure describing file.
// If there is an error, it will be of type *PathError.
func (file *File) Stat() (FileInfo, error) {
if file == nil {
return nil, ErrInvalid
}

if file.isdir() {
// I don't know any better way to do that for directory
return Stat(file.dirinfo.path)
}
if isNulName(file.name) {
return &devNullStat, nil
}

ft, err := file.pfd.GetFileType()
if err != nil {
return nil, &PathError{"GetFileType", file.name, err}
}
switch ft {
case syscall.FILE_TYPE_PIPE, syscall.FILE_TYPE_CHAR:
return &fileStat{name: basename(file.name), filetype: ft}, nil
}

fs, err := newFileStatFromGetFileInformationByHandle(file.name, file.pfd.Sysfd)
if err != nil {
return nil, err
}
fs.filetype = ft
return fs, err
}

判断其实也是比较简单的吧,还是接着看其是如何初始化一个新的数据库db.init()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// init creates a new database file and initializes its meta pages.
func (db *DB) init() error {
// Set the page size to the OS page size.
db.pageSize = os.Getpagesize()

// Create two meta pages on a buffer.
buf := make([]byte, db.pageSize*4)
for i := 0; i < 2; i++ {
p := db.pageInBuffer(buf[:], pgid(i))
p.id = pgid(i)
p.flags = metaPageFlag

// Initialize the meta page.
m := p.meta()
m.magic = magic
m.version = version
m.pageSize = uint32(db.pageSize)
m.freelist = 2
m.root = bucket{root: 3}
m.pgid = 4
m.txid = txid(i)
m.checksum = m.sum64()
}

// Write an empty freelist at page 3.
p := db.pageInBuffer(buf[:], pgid(2))
p.id = pgid(2)
p.flags = freelistPageFlag
p.count = 0

// Write an empty leaf page at page 4.
p = db.pageInBuffer(buf[:], pgid(3))
p.id = pgid(3)
p.flags = leafPageFlag
p.count = 0

// Write the buffer to our data file.
if _, err := db.ops.writeAt(buf, 0); err != nil {
return err
}
if err := fdatasync(db); err != nil {
return err
}

return nil
}

看到这感觉还是能明白在干嘛的,其采用单个文件来将数据存储在磁盘上,该文件的前4个page是固定的,分别是 meta pagemeta pagefreelistleaf page,但是对于这4个page,到底是什么就让人很头大了。先看meta page的定义p.meta()
1
2
3
4
// meta returns a pointer to the metadata section of the page.
func (p *page) meta() *meta {
return (*meta)(unsafe.Pointer(&p.ptr))
}

还是先看看其基础的page的定义
1
2
3
4
5
6
7
type page struct {
id pgid
flags uint16
count uint16
overflow uint32
ptr uintptr
}

pageboltdb持久化时,与磁盘相关的数据结构。page的大小采用操作系统内存页的大小,即getpagesize系统调用的返回值。idpage的序号,flags表示page的类型,有branchPageFlag/leafPageFlag/metaPageFlag/freelistPageFlag几种,countpagefreelistPageFlag类型时,存储的是freelistpgid数组中元素的个数;当page时其他类型时,存储的是inode的个数,overflow记录page中数据量超过一个page所能存储的大小时候需要额外的page的数目
每个page对应对应一个磁盘上的数据块。这个数据块的layout为:

1
| page struct data | page element items | k-v pairs |

其分为3个部分:

  • 第一部分page struct data是该pageheader,存储的就是pagestruct的数据。
  • 第二部分page element items其实就是node的里inode的持久化部分数据。
  • 第三部分k-v pairs存储的是inode里具体的key-value数据。

接着就可以探索一下meta page是什么玩意了

1
2
3
4
5
6
7
8
9
10
11
type meta struct {
magic uint32 // 存储魔数0xED0CDAED
version uint32 // 标明存储格式的版本,现在是2
pageSize uint32 // 标明每个page的大小
flags uint32 // 当前已无用
root bucket // 根Bucket
freelist pgid // 标明当前freelist数据存在哪个page中
pgid pgid //
txid txid //
checksum uint64 // 以上数据的校验和,校验数据是否损坏
}

接着再看看freelist是什么,关于freelist的定义需要一层层不断找,比较繁琐
1
2
3
4
5
6
7
// freelist represents a list of all pages that are available for allocation.
// It also tracks pages that have been freed but are still in use by open transactions.
type freelist struct {
ids []pgid // all free and available free page ids.
pending map[txid][]pgid // mapping of soon-to-be free page ids by tx.
cache map[pgid]bool // fast lookup of all free and pending page ids.
}

依赖注释我们已经可以大概猜测出freelist的作用,其是BoltDb实现磁盘空间的重复利用机制中需要用到的文件page缓存

ids记录了当前缓存着的空闲pagepgidcache中记录的也是这些pgid,采用map记录 方便快速查找。

当用户需要page时,调用freelist.allocate(n int) pgid,其中n为需要的page数量,其会遍历ids,从中 挑选出连续n个空闲的page,然后将其从缓存中剔除,然后将其实的page-id返回给调用者。当不存在满足需求的 page时,返回0,因为文件的起始2个page固定为meta page,因此有效的page-id不可能为0。

当某个写事务产生无用page时,将调用freelist.free(txid txid, p *page)将指定page p放入pending池和 cache中。当下一个写事务开启时,会将没有Tx引用的pending中的page搬移到ids缓存中。之所以这样做, 是为了支持事务的回滚和并发读事务,从而实现MVCC

当发起一个读事务时,Tx单独复制一份meta信息,从这份独有的meta作为入口,可以读出该meta指向的数据, 此时即使有一个写事务修改了相关key的数据,新修改的数据只会被写入新的page,读事务持有的page会进入pending 池,因此该读事务相关的数据并不会被修改。只有该page相关的读事务都结束时,才会从pending池进入到cache池 中,从而被复用修改。

当写事务更新数据时,并不直接覆盖老数据,而且分配一个新的page将更新后的数据写入,然后将老数据占用的page 放入pending池,建立新的索引。当事务需要回滚时,只需要将pending池中的page释放,将索引回滚即完成数据的 回滚。这样加速了事务的回滚。减少了事务缓存的内存使用,同时避免了对正在读的事务的干扰。

到这里我们也大概了解到BoltDb中部分的机制,说实话关于一些涉及底层方面的有点难以理解,主要关于数据库存储以及操作系统方面的知识面太浅了,之前没有好好学习,还是得先去好好看看数据库和文件系统相关以及存储相关的知识再继续尝试学习源码了吧

参考链接