Skip to content

Commit be7a3f5

Browse files
committed
fix file transfer logic
1 parent 239d16b commit be7a3f5

5 files changed

Lines changed: 45 additions & 12 deletions

File tree

index/scorch/merge.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,6 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
363363

364364
var seg segment.Segment
365365
var filename string
366-
var trainingSample []float32
367366
if len(segmentsToMerge) > 0 {
368367
filename = zapFileName(newSegmentID)
369368
s.markIneligibleForRemoval(filename)

index/scorch/scorch.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,13 +1084,6 @@ func (s *Scorch) CopyFile(file string, d index.IndexDirectory) error {
10841084
s.rootLock.Lock()
10851085
defer s.rootLock.Unlock()
10861086

1087-
// this code is currently specific to copying trained data but is future proofed for other files
1088-
// to be updated in the dest's bolt
1089-
err := s.trainer.copyFileLOCKED(file, d)
1090-
if err != nil {
1091-
return err
1092-
}
1093-
10941087
dest, err := d.GetWriter(filepath.Join("store", file))
10951088
if err != nil {
10961089
return err
@@ -1104,6 +1097,13 @@ func (s *Scorch) CopyFile(file string, d index.IndexDirectory) error {
11041097
defer source.Close()
11051098
defer dest.Close()
11061099
_, err = io.Copy(dest, source)
1100+
if err != nil {
1101+
return err
1102+
}
1103+
1104+
// this code is currently specific to copying trained data but is future proofed for other files
1105+
// to be updated in the dest's bolt
1106+
err = s.trainer.copyFileLOCKED(file, d)
11071107
return err
11081108
}
11091109

index/scorch/train_vector.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,12 @@ func (t *vectorTrainer) updateBolt(snapshotsBucket *bolt.Bucket, key []byte, val
315315
if err != nil {
316316
return err
317317
}
318+
319+
// update the centroid index pointer
320+
t.centroidIndex, err = t.parent.loadSegment(trainerBucket)
321+
if err != nil {
322+
return err
323+
}
318324
}
319325

320326
return nil

index_alias_impl.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,6 @@ func (i *indexAliasImpl) IndexSynonym(id string, collection string, definition *
106106
func (i *indexAliasImpl) Train(batch *Batch) error {
107107
i.mutex.RLock()
108108
defer i.mutex.RUnlock()
109-
if !i.open {
110-
return ErrorIndexClosed
111-
}
112109

113110
if !i.open {
114111
return ErrorIndexClosed

index_impl.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1432,7 +1432,6 @@ func (m *searchHitSorter) Less(i, j int) bool {
14321432
return c < 0
14331433
}
14341434

1435-
// CopyTo (index.Directory, filter)
14361435
func (i *indexImpl) CopyTo(d index.Directory) (err error) {
14371436
i.mutex.RLock()
14381437
defer i.mutex.RUnlock()
@@ -1594,3 +1593,35 @@ func (i *indexImpl) buildTopNCollector(ctx context.Context, req *SearchRequest,
15941593
}
15951594
return newCollector(), nil
15961595
}
1596+
1597+
func (i *indexImpl) CopyFile(file string, d index.IndexDirectory) (err error) {
1598+
i.mutex.RLock()
1599+
defer i.mutex.RUnlock()
1600+
1601+
if !i.open {
1602+
return ErrorIndexClosed
1603+
}
1604+
1605+
copyIndex, ok := i.i.(index.IndexFileCopyable)
1606+
if !ok {
1607+
return fmt.Errorf("index implementation does not support copy reader")
1608+
}
1609+
1610+
return copyIndex.CopyFile(file, d)
1611+
}
1612+
1613+
func (i *indexImpl) UpdateFileInBolt(key []byte, value []byte) error {
1614+
i.mutex.RLock()
1615+
defer i.mutex.RUnlock()
1616+
1617+
if !i.open {
1618+
return ErrorIndexClosed
1619+
}
1620+
1621+
copyIndex, ok := i.i.(index.IndexFileCopyable)
1622+
if !ok {
1623+
return fmt.Errorf("index implementation does not support file copy")
1624+
}
1625+
1626+
return copyIndex.UpdateFileInBolt(key, value)
1627+
}

0 commit comments

Comments
 (0)