-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdebug_offset_test.go
More file actions
70 lines (55 loc) · 1.83 KB
/
debug_offset_test.go
File metadata and controls
70 lines (55 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package comet
import (
"context"
"testing"
)
// TestDebugOffsetCalculation debugs how offsets are calculated
func TestDebugOffsetCalculation(t *testing.T) {
dir := t.TempDir()
client, err := NewClient(dir, DefaultCometConfig())
if err != nil {
t.Fatal(err)
}
defer client.Close()
ctx := context.Background()
stream := "test:v1:shard:0000"
// Write entries one by one and check offsets
expectedOffset := int64(0)
for i := 0; i < 5; i++ {
data := []byte("x") // 1 byte entry
// Calculate expected size: 12 (header) + 1 (data) = 13 bytes per entry
entrySize := int64(12 + len(data))
t.Logf("Writing entry %d, expecting it at offset %d", i, expectedOffset)
_, err := client.Append(ctx, stream, [][]byte{data})
if err != nil {
t.Fatal(err)
}
// Sync to make it durable
client.Sync(ctx)
// Check shard state
shard, _ := client.getOrCreateShard(0)
shard.mu.RLock()
currentWriteOffset := shard.index.CurrentWriteOffset
currentEntryNum := shard.index.CurrentEntryNumber
if len(shard.index.Files) > 0 {
file := shard.index.Files[0]
t.Logf("After entry %d: CurrentWriteOffset=%d, CurrentEntryNumber=%d, File.EndOffset=%d, File.Entries=%d",
i, currentWriteOffset, currentEntryNum, file.EndOffset, file.Entries)
}
shard.mu.RUnlock()
// Verify the offset matches our calculation
if currentWriteOffset != expectedOffset+entrySize {
t.Errorf("Entry %d: expected CurrentWriteOffset=%d, got %d",
i, expectedOffset+entrySize, currentWriteOffset)
}
expectedOffset += entrySize
}
// Now try to read all entries and see what offsets are used
consumer := NewConsumer(client, ConsumerOptions{Group: "test"})
defer consumer.Close()
messages, err := consumer.Read(ctx, []uint32{0}, 10)
if err != nil {
t.Fatalf("Failed to read: %v", err)
}
t.Logf("Successfully read %d messages", len(messages))
}