-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrotation_bench_test.go
More file actions
134 lines (113 loc) · 3.32 KB
/
rotation_bench_test.go
File metadata and controls
134 lines (113 loc) · 3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package comet
import (
"context"
"fmt"
"sync"
"testing"
"time"
)
// BenchmarkFileRotation measures the performance of file rotation under concurrent load
func BenchmarkFileRotation(b *testing.B) {
sizes := []int{
1024, // 1KB - extreme rotation
10 * 1024, // 10KB - high rotation
100 * 1024, // 100KB - moderate rotation
}
for _, maxSize := range sizes {
b.Run(fmt.Sprintf("MaxSize_%dKB", maxSize/1024), func(b *testing.B) {
dir := b.TempDir()
config := DefaultCometConfig()
config.Storage.MaxFileSize = int64(maxSize)
config.Storage.FlushInterval = 0 // Disable periodic flush
client, err := NewClient(dir, config)
if err != nil {
b.Fatal(err)
}
defer client.Close()
ctx := context.Background()
streamName := "bench:v1:shard:0000"
// Prepare data that will cause rotations
entrySize := 100 // 100 bytes per entry
data := make([]byte, entrySize)
for i := range data {
data[i] = byte(i % 256)
}
b.ResetTimer()
b.SetBytes(int64(entrySize))
// Run concurrent writers to stress rotation
var wg sync.WaitGroup
numWriters := 4
entriesPerWriter := b.N / numWriters
start := time.Now()
for w := 0; w < numWriters; w++ {
wg.Add(1)
go func(writerID int) {
defer wg.Done()
for i := 0; i < entriesPerWriter; i++ {
entry := []byte(fmt.Sprintf("writer-%d-entry-%d-%s", writerID, i, data))
if _, err := client.Append(ctx, streamName, [][]byte{entry}); err != nil {
b.Errorf("Writer %d failed: %v", writerID, err)
return
}
}
}(w)
}
wg.Wait()
duration := time.Since(start)
// Calculate metrics
totalBytes := int64(b.N) * int64(entrySize)
throughputMBps := float64(totalBytes) / duration.Seconds() / (1024 * 1024)
rotations := totalBytes / int64(maxSize)
b.ReportMetric(throughputMBps, "MB/s")
b.ReportMetric(float64(rotations), "rotations")
b.ReportMetric(float64(duration.Nanoseconds())/float64(b.N), "ns/write")
})
}
}
// BenchmarkRotationLockContention specifically measures lock contention during rotation
func BenchmarkRotationLockContention(b *testing.B) {
dir := b.TempDir()
config := DefaultCometConfig()
config.Storage.MaxFileSize = 10 * 1024 // 10KB files for frequent rotation
client, err := NewClient(dir, config)
if err != nil {
b.Fatal(err)
}
defer client.Close()
ctx := context.Background()
streamName := "bench:v1:shard:0000"
// Pre-fill to be near rotation boundary
prefillData := make([]byte, 9*1024) // 9KB
client.Append(ctx, streamName, [][]byte{prefillData})
b.ResetTimer()
// Measure time for operations during rotation
var wg sync.WaitGroup
numReaders := 10
numWriters := 5
// Start readers that will be blocked during rotation
for r := 0; r < numReaders; r++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
for i := 0; i < b.N/numReaders; i++ {
// Try to read during rotation
shard, _ := client.getOrCreateShard(0)
shard.mu.RLock()
_ = len(shard.index.Files) // Simulate read operation
shard.mu.RUnlock()
}
}(r)
}
// Writers that trigger rotations
for w := 0; w < numWriters; w++ {
wg.Add(1)
go func(writerID int) {
defer wg.Done()
data := make([]byte, 200) // Each write triggers rotation
for i := 0; i < b.N/numWriters; i++ {
client.Append(ctx, streamName, [][]byte{data})
}
}(w)
}
wg.Wait()
}