-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb.go
232 lines (210 loc) · 4.01 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package mini_kv_db
import (
"io"
"os"
"sync"
)
const (
SET uint16 = 0
DEL uint16 = 1
)
type MiniKVDB struct {
indexes map[string]int64 //内存中保存的文件真实的物理索引位置
dbFile *DbFile //数据文件
dirPath string //数据目录
mu sync.RWMutex //读写同步锁
}
//
// Open
// @Description: Open a MiniKVDB instance,打开一个数据库实例
// @param dirPath
// @return *MiniKVDB
// @return error
//
func Open(dirPath string) (*MiniKVDB, error) {
_, err := os.Stat(dirPath)
{
if os.IsNotExist(err) {
err := os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
return nil, err
}
}
}
//加载数据文件
dbFile, err := NewDbFile(dirPath)
if err != nil {
return nil, err
}
db := &MiniKVDB{
dbFile: dbFile,
dirPath: dirPath,
indexes: make(map[string]int64),
}
//加载索引
db.loadIndexesFromFile()
return db, err
}
//
// Merge
// @Description: 合并数据文件
// @receiver db
//
func (db *MiniKVDB) Merge() error {
if db.dbFile.Offset == 0 {
return nil
}
var (
validEntries []*Entry
offset int64
)
for {
e, err := db.dbFile.Read(offset)
if err != nil {
if err == io.EOF {
break
}
return err
}
//读取一个索引,看是否有效
if off, ok := db.indexes[string(e.Key)]; ok && off == offset {
validEntries = append(validEntries, e)
}
offset += e.GetSize()
}
if len(validEntries) <= 0 {
return nil
}
//新建临时文件
mergeDbFile, err := NewMergeDbFile(db.dirPath)
if err != nil {
return err
}
defer os.Remove(mergeDbFile.File.Name())
//重新写入有效的entry
for _, entry := range validEntries {
writeOffset := mergeDbFile.Offset
err := mergeDbFile.Write(entry)
if err != nil {
return err
}
//更新索引
db.indexes[string(entry.Key)] = writeOffset
}
//获取文件名
dbFileName := db.dbFile.File.Name()
db.dbFile.File.Close()
//删除旧的数据文件
os.Remove(dbFileName)
//合并好的文件名
mergeDbFileName := mergeDbFile.File.Name()
mergeDbFile.File.Close()
os.Rename(mergeDbFileName, db.dirPath+string(os.PathSeparator)+FileName)
db.dbFile = mergeDbFile
return nil
}
//
// Set
// @Description: 设置数据,类似Redis的set方法
// @receiver db
// @param key
// @param value
// @return err
//
func (db *MiniKVDB) Set(key []byte, value []byte) (err error) {
if len(key) == 0 {
return
}
db.mu.Lock()
defer db.mu.Unlock()
offset := db.dbFile.Offset
//封装成entry
entry := NewEntry(key, value, SET)
//写入数据文件
err = db.dbFile.Write(entry)
if err != nil {
return err
}
//写入内存索引
db.indexes[string(key)] = offset
return
}
//
// Get
// @Description: 获取数据,类似Redis的get方法
// @receiver db
// @param key
// @return val
// @return err
//
func (db *MiniKVDB) Get(key []byte) (val []byte, err error) {
if len(key) == 0 {
return
}
db.mu.Lock()
defer db.mu.Unlock()
//从内存中去除索引
offset, ok := db.indexes[string(key)]
if !ok {
//key不存在
return
}
entry, err := db.dbFile.Read(offset)
if err != nil && err != io.EOF {
return
}
if entry != nil {
val = entry.Value
}
return
}
//
// Del
// @Description: 删除数据
// @receiver db
// @param key
// @return error
//
func (db *MiniKVDB) Del(key []byte) (err error) {
if len(key) == 0 {
return
}
db.mu.Lock()
defer db.mu.Unlock()
//如果内存中已经没有,则直接返回
_, ok := db.indexes[string(key)]
if !ok {
return
}
//Step1往文件写一个删除的entry
entry := NewEntry(key, nil, DEL)
err = db.dbFile.Write(entry)
if err != nil {
return err
}
//Step2删除内存中的数据
delete(db.indexes, string(key))
return nil
}
func (db *MiniKVDB) loadIndexesFromFile() {
if db.dbFile == nil {
return
}
var offset int64
for {
e, err := db.dbFile.Read(offset)
if err != nil {
//读取完毕
if err == io.EOF {
break
}
return
}
//设置索引状态
db.indexes[string(e.Key)] = offset
if e.Mark == DEL {
delete(db.indexes, string(e.Key))
}
offset = offset + e.GetSize()
}
}