-
Notifications
You must be signed in to change notification settings - Fork 92
/
segment.go
156 lines (133 loc) · 3.75 KB
/
segment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package pogreb
import (
"bufio"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
)
type recordType int
const (
recordTypePut recordType = iota
recordTypeDelete
segmentExt = ".psg"
)
// segment is a write-ahead log segment.
// It consists of a sequence of binary-encoded variable length records.
type segment struct {
*file
id uint16 // Physical segment identifier.
sequenceID uint64 // Logical monotonically increasing segment identifier.
name string
meta *segmentMeta
}
func segmentName(id uint16, sequenceID uint64) string {
return fmt.Sprintf("%05d-%d%s", id, sequenceID, segmentExt)
}
type segmentMeta struct {
Full bool
PutRecords uint32
DeleteRecords uint32
DeletedKeys uint32
DeletedBytes uint32
}
func segmentMetaName(id uint16, sequenceID uint64) string {
return segmentName(id, sequenceID) + metaExt
}
// Binary representation of a segment record:
// +---------------+------------------+------------------+-...-+--...--+----------+
// | Key Size (2B) | Record Type (1b) | Value Size (31b) | Key | Value | CRC (4B) |
// +---------------+------------------+------------------+-...-+--...--+----------+
type record struct {
rtype recordType
segmentID uint16
offset uint32
data []byte
key []byte
value []byte
}
func encodedRecordSize(kvSize uint32) uint32 {
// key size, value size, key, value, crc32
return 2 + 4 + kvSize + 4
}
func encodeRecord(key []byte, value []byte, rt recordType) []byte {
size := encodedRecordSize(uint32(len(key) + len(value)))
data := make([]byte, size)
binary.LittleEndian.PutUint16(data[:2], uint16(len(key)))
valLen := uint32(len(value))
if rt == recordTypeDelete { // Set delete bit.
valLen |= 1 << 31
}
binary.LittleEndian.PutUint32(data[2:], valLen)
copy(data[6:], key)
copy(data[6+len(key):], value)
checksum := crc32.ChecksumIEEE(data[:6+len(key)+len(value)])
binary.LittleEndian.PutUint32(data[size-4:size], checksum)
return data
}
func encodePutRecord(key []byte, value []byte) []byte {
return encodeRecord(key, value, recordTypePut)
}
func encodeDeleteRecord(key []byte) []byte {
return encodeRecord(key, nil, recordTypeDelete)
}
// segmentIterator iterates over segment records.
type segmentIterator struct {
f *segment
offset uint32
r *bufio.Reader
buf []byte // kv size and crc32 reusable buffer.
}
func newSegmentIterator(f *segment) (*segmentIterator, error) {
if _, err := f.Seek(int64(headerSize), io.SeekStart); err != nil {
return nil, err
}
return &segmentIterator{
f: f,
offset: headerSize,
r: bufio.NewReader(f),
buf: make([]byte, 6),
}, nil
}
func (it *segmentIterator) next() (record, error) {
// Read key and value size.
kvSizeBuf := it.buf
if _, err := io.ReadFull(it.r, kvSizeBuf); err != nil {
if err == io.EOF {
return record{}, ErrIterationDone
}
return record{}, err
}
// Decode key size.
keySize := uint32(binary.LittleEndian.Uint16(kvSizeBuf[:2]))
// Decode value size and record type.
rt := recordTypePut
valueSize := binary.LittleEndian.Uint32(kvSizeBuf[2:])
if valueSize&(1<<31) != 0 {
rt = recordTypeDelete
valueSize &^= 1 << 31
}
// Read key, value and checksum.
recordSize := encodedRecordSize(keySize + valueSize)
data := make([]byte, recordSize)
copy(data, kvSizeBuf)
if _, err := io.ReadFull(it.r, data[6:]); err != nil {
return record{}, err
}
// Verify checksum.
checksum := binary.LittleEndian.Uint32(data[len(data)-4:])
if checksum != crc32.ChecksumIEEE(data[:len(data)-4]) {
return record{}, errCorrupted
}
offset := it.offset
it.offset += recordSize
rec := record{
rtype: rt,
segmentID: it.f.id,
offset: offset,
data: data,
key: data[6 : 6+keySize],
value: data[6+keySize : 6+keySize+valueSize],
}
return rec, nil
}