Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion util/rowcodec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
// CodecVer is the constant number that represent the new row format.
const CodecVer = 128

var errInvalidCodecVer = errors.New("invalid codec version")
var (
errInvalidCodecVer = errors.New("invalid codec version")
errInvalidChecksumVer = errors.New("invalid checksum version")
)

// First byte in the encoded value which specifies the encoding type.
const (
Expand Down
65 changes: 54 additions & 11 deletions util/rowcodec/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,69 @@ type Encoder struct {
values []*types.Datum
// Enable indicates whether this encoder should be use.
Enable bool
// WithChecksum indicates whether to append checksum to the encoded row data.
WithChecksum bool
}

// Encode encodes a row from a datums slice.
func (encoder *Encoder) Encode(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, buf []byte) ([]byte, error) {
encoder.reset()
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return nil, err
}
return encoder.row.toBytes(buf[:0]), nil
}

// EncodeWithExtraChecksum likes Encode but also appends an extra checksum if checksum is enabled.
func (encoder *Encoder) EncodeWithExtraChecksum(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, checksum uint32, buf []byte) ([]byte, error) {
Comment thread
cfzjywxk marked this conversation as resolved.
encoder.reset()
if encoder.hasChecksum() {
encoder.setExtraChecksum(checksum)
}
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return nil, err
}
return encoder.row.toBytes(buf[:0]), nil
}

// Checksum caclulates the checksum of datumns.
func (encoder *Encoder) Checksum(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum) (uint32, error) {
Comment thread
cfzjywxk marked this conversation as resolved.
encoder.reset()
encoder.flags |= rowFlagChecksum
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return 0, err
}
return encoder.checksum1, nil
}

func (encoder *Encoder) encodeDatums(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum) error {
encoder.appendColVals(colIDs, values)
numCols, notNullIdx := encoder.reformatCols()
err := encoder.encodeRowCols(sc, numCols, notNullIdx)
if err != nil {
return nil, err
return err
}
return encoder.row.toBytes(buf[:0]), nil
return nil
}

func (encoder *Encoder) reset() {
encoder.large = false
encoder.flags = 0
encoder.numNotNullCols = 0
encoder.numNullCols = 0
encoder.data = encoder.data[:0]
encoder.tempColIDs = encoder.tempColIDs[:0]
encoder.values = encoder.values[:0]
encoder.offsets32 = encoder.offsets32[:0]
encoder.offsets = encoder.offsets[:0]
encoder.checksumHeader = 0
encoder.checksum1 = 0
encoder.checksum2 = 0
if encoder.WithChecksum {
encoder.flags |= rowFlagChecksum
}
}

func (encoder *Encoder) appendColVals(colIDs []int64, values []types.Datum) {
Expand All @@ -67,7 +107,7 @@ func (encoder *Encoder) appendColVals(colIDs []int64, values []types.Datum) {

func (encoder *Encoder) appendColVal(colID int64, d *types.Datum) {
if colID > 255 {
encoder.large = true
encoder.flags |= rowFlagLarge
}
if d.IsNull() {
encoder.numNullCols++
Expand All @@ -83,7 +123,7 @@ func (encoder *Encoder) reformatCols() (numCols, notNullIdx int) {
numCols = len(encoder.tempColIDs)
nullIdx := numCols - int(r.numNullCols)
notNullIdx = 0
if r.large {
if r.large() {
r.initColIDs32()
r.initOffsets32()
} else {
Expand All @@ -92,14 +132,14 @@ func (encoder *Encoder) reformatCols() (numCols, notNullIdx int) {
}
for i, colID := range encoder.tempColIDs {
if encoder.values[i].IsNull() {
if r.large {
if r.large() {
r.colIDs32[nullIdx] = uint32(colID)
} else {
r.colIDs[nullIdx] = byte(colID)
}
nullIdx++
} else {
if r.large {
if r.large() {
r.colIDs32[notNullIdx] = uint32(colID)
} else {
r.colIDs[notNullIdx] = byte(colID)
Expand All @@ -108,7 +148,7 @@ func (encoder *Encoder) reformatCols() (numCols, notNullIdx int) {
notNullIdx++
}
}
if r.large {
if r.large() {
largeNotNullSorter := (*largeNotNullSorter)(encoder)
sort.Sort(largeNotNullSorter)
if r.numNullCols > 0 {
Expand Down Expand Up @@ -136,7 +176,7 @@ func (encoder *Encoder) encodeRowCols(sc *stmtctx.StatementContext, numCols, not
return err
}
// handle convert to large
if len(r.data) > math.MaxUint16 && !r.large {
if len(r.data) > math.MaxUint16 && !r.large() {
r.initColIDs32()
for j := 0; j < numCols; j++ {
r.colIDs32[j] = uint32(r.colIDs[j])
Expand All @@ -145,14 +185,17 @@ func (encoder *Encoder) encodeRowCols(sc *stmtctx.StatementContext, numCols, not
for j := 0; j <= i; j++ {
r.offsets32[j] = uint32(r.offsets[j])
}
r.large = true
r.flags |= rowFlagLarge
}
if r.large {
if r.large() {
r.offsets32[i] = uint32(len(r.data))
} else {
r.offsets[i] = uint16(len(r.data))
}
}
if r.hasChecksum() {
return r.calcChecksum()
}
return nil
}

Expand Down
155 changes: 136 additions & 19 deletions util/rowcodec/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,98 @@ package rowcodec

import (
"encoding/binary"
"hash/crc32"
)

// row is the struct type used to access a row.
const (
rowFlagLarge byte = 1 << iota
rowFlagChecksum
)

const (
checksumMaskVersion byte = 0b0111
checksumFlagExtra byte = 0b1000
)

// row is the struct type used to access a row and the row format is shown as the following.
//
// Row Format
//
// 0 1 2 3
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | VER | FLAGS | NOT_NULL_COL_CNT |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | NULL_COL_CNT | ...NOT_NULL_COL_IDS... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ...NULL_COL_IDS... | ...NOT_NULL_COL_OFFSETS... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ...NOT_NULL_COL_DATA... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ...CHECKSUM... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// - FLAGS
// - 0x01: large (when max(col_ids) > 255 or len(col_data) > max_u16)
// - size of col_id = large ? 4 : 1
// - size of col_offset = large ? 4 : 2
// - 0x02: has checksum
//
// Checksum
//
// 0 1 2 3 4 5 6 7 8
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | |E| VER | CHECKSUM | EXTRA_CHECKSUM(OPTIONAL) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// HEADER
//
// - HEADER
// - VER: version
// - E: has extra checksum
// - CHECKSUM
// - little-endian CRC32(IEEE) when hdr.ver = 0 (default)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type row struct {
// small: colID []byte, offsets []uint16, optimized for most cases.
// large: colID []uint32, offsets []uint32.
large bool
flags byte
checksumHeader byte
numNotNullCols uint16
numNullCols uint16
colIDs []byte

// for small row: colID []byte, offsets []uint16, optimized for most cases.
colIDs []byte
offsets []uint16
data []byte

// for large row
// for large row: colID []uint32, offsets []uint32.
colIDs32 []uint32
offsets32 []uint32

data []byte
checksum1 uint32
checksum2 uint32
}

func (r *row) large() bool { return r.flags&rowFlagLarge > 0 }

func (r *row) hasChecksum() bool { return r.flags&rowFlagChecksum > 0 }

func (r *row) hasExtraChecksum() bool { return r.checksumHeader&checksumFlagExtra > 0 }

func (r *row) checksumVersion() int { return int(r.checksumHeader & checksumMaskVersion) }

func (r *row) calcChecksum() error {
if r.checksumVersion() != 0 {
return errInvalidChecksumVer
}
r.checksum1 = crc32.ChecksumIEEE(r.data)
return nil
}

func (r *row) setExtraChecksum(v uint32) {
r.checksumHeader |= checksumFlagExtra
r.checksum2 = v
}

func (r *row) getData(i int) []byte {
var start, end uint32
if r.large {
if r.large() {
if i > 0 {
start = r.offsets32[i-1]
}
Expand All @@ -55,46 +125,76 @@ func (r *row) fromBytes(rowData []byte) error {
if rowData[0] != CodecVer {
return errInvalidCodecVer
}
r.large = rowData[1]&1 > 0
r.flags = rowData[1]
r.numNotNullCols = binary.LittleEndian.Uint16(rowData[2:])
r.numNullCols = binary.LittleEndian.Uint16(rowData[4:])
cursor := 6
if r.large {
lastOffset := 0
if r.large() {
colIDsLen := int(r.numNotNullCols+r.numNullCols) * 4
r.colIDs32 = bytesToU32Slice(rowData[cursor : cursor+colIDsLen])
cursor += colIDsLen
offsetsLen := int(r.numNotNullCols) * 4
r.offsets32 = bytesToU32Slice(rowData[cursor : cursor+offsetsLen])
cursor += offsetsLen
if n := len(r.offsets32); n > 0 {
lastOffset = int(r.offsets32[n-1])
}
} else {
colIDsLen := int(r.numNotNullCols + r.numNullCols)
r.colIDs = rowData[cursor : cursor+colIDsLen]
cursor += colIDsLen
offsetsLen := int(r.numNotNullCols) * 2
r.offsets = bytes2U16Slice(rowData[cursor : cursor+offsetsLen])
cursor += offsetsLen
if n := len(r.offsets); n > 0 {
lastOffset = int(r.offsets[n-1])
}
}
r.data = rowData[cursor : cursor+lastOffset]
cursor += lastOffset

if r.hasChecksum() {
r.checksumHeader = rowData[cursor]
if r.checksumVersion() != 0 {
return errInvalidChecksumVer
}
cursor++
r.checksum1 = binary.LittleEndian.Uint32(rowData[cursor:])
cursor += 4
if r.hasExtraChecksum() {
r.checksum2 = binary.LittleEndian.Uint32(rowData[cursor:])
} else {
r.checksum2 = 0
}
} else {
r.checksumHeader = 0
r.checksum1 = 0
r.checksum2 = 0
}
r.data = rowData[cursor:]
return nil
}

func (r *row) toBytes(buf []byte) []byte {
buf = append(buf, CodecVer)
flag := byte(0)
if r.large {
flag = 1
}
buf = append(buf, flag)
buf = append(buf, r.flags)
buf = append(buf, byte(r.numNotNullCols), byte(r.numNotNullCols>>8))
buf = append(buf, byte(r.numNullCols), byte(r.numNullCols>>8))
if r.large {
if r.large() {
buf = append(buf, u32SliceToBytes(r.colIDs32)...)
buf = append(buf, u32SliceToBytes(r.offsets32)...)
} else {
buf = append(buf, r.colIDs...)
buf = append(buf, u16SliceToBytes(r.offsets)...)
}
buf = append(buf, r.data...)
if r.hasChecksum() {
buf = append(buf, r.checksumHeader)
buf = binary.LittleEndian.AppendUint32(buf, r.checksum1)
if r.hasExtraChecksum() {
buf = binary.LittleEndian.AppendUint32(buf, r.checksum2)
}
}
return buf
}

Expand All @@ -105,7 +205,7 @@ func (r *row) findColID(colID int64) (idx int, isNil, notFound bool) {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
var v int64
if r.large {
if r.large() {
v = int64(r.colIDs32[h])
} else {
v = int64(r.colIDs[h])
Expand All @@ -126,7 +226,7 @@ func (r *row) findColID(colID int64) (idx int, isNil, notFound bool) {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
var v int64
if r.large {
if r.large() {
v = int64(r.colIDs32[h])
} else {
v = int64(r.colIDs[h])
Expand All @@ -144,6 +244,23 @@ func (r *row) findColID(colID int64) (idx int, isNil, notFound bool) {
return
}

// GetChecksum returns the checksum of row data (not null columns).
func (r *row) GetChecksum() (uint32, bool) {
if !r.hasChecksum() {
return 0, false
}
return r.checksum1, true
}

// GetExtraChecksum returns the extra checksum which shall be calculated in the last stable schema version (whose
// elements are all public).
func (r *row) GetExtraChecksum() (uint32, bool) {
if !r.hasExtraChecksum() {
return 0, false
}
return r.checksum2, true
}

// ColumnIsNull returns if the column value is null. Mainly used for count column aggregation.
// this method will used in unistore.
func (r *row) ColumnIsNull(rowData []byte, colID int64, defaultVal []byte) (bool, error) {
Expand Down
Loading