Skip to content

Commit 5856192

Browse files
committed
cleanup trainer after complete
1 parent 792bfad commit 5856192

3 files changed

Lines changed: 22 additions & 34 deletions

File tree

index/scorch/snapshot_segment.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,9 @@ func (c *cachedMeta) updateMeta(field string, val interface{}) {
370370

371371
func (c *cachedMeta) fetchMeta(field string) (rv interface{}) {
372372
c.m.RLock()
373+
if c.meta == nil {
374+
return nil
375+
}
373376
rv = c.meta[field]
374377
c.m.RUnlock()
375378
return rv

index/scorch/train_vector.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ func (t *vectorTrainer) trainLoop() {
9393
t.parent.asyncTasks.Done()
9494
}()
9595
// initialize stuff
96-
totalSamplesProcessed := t.centroidIndex.cachedMeta.fetchMeta("trainedSamples").(uint64)
96+
var totalSamplesProcessed uint64
97+
if t.centroidIndex != nil {
98+
totalSamplesProcessed = t.centroidIndex.cachedMeta.fetchMeta("trainedSamples").(uint64)
99+
}
97100
buf := make([]byte, binary.MaxVarintLen64)
98101
t.parent.segmentConfig[index.CentroidIndexCallback] = t.getCentroidIndex
99102
path := filepath.Join(t.parent.path, index.CentroidIndexFileName)
@@ -230,9 +233,23 @@ func (t *vectorTrainer) trainLoop() {
230233
}
231234
t.m.Lock()
232235
t.centroidIndex = &SegmentSnapshot{
233-
segment: centroidIndex,
236+
segment: centroidIndex,
237+
cachedMeta: &cachedMeta{meta: nil},
234238
}
235239
t.m.Unlock()
240+
241+
// if the train complete flag has been set, exit the routine and cleanup
242+
if trainReq.trainComplete {
243+
// cleanup .tmp file if it exists
244+
if _, err := os.Stat(path + ".tmp"); err == nil {
245+
err = os.Remove(path + ".tmp")
246+
if err != nil {
247+
trainReq.ackCh <- fmt.Errorf("error removing .tmp file: %v", err)
248+
}
249+
}
250+
close(trainReq.ackCh)
251+
return
252+
}
236253
close(trainReq.ackCh)
237254
}
238255
}

index_impl.go

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,38 +1465,6 @@ func (i *indexImpl) CopyTo(d index.Directory) (err error) {
14651465
return i.meta.CopyTo(d)
14661466
}
14671467

1468-
func (i *indexImpl) CopyFile(file string, d index.IndexDirectory) (err error) {
1469-
i.mutex.RLock()
1470-
defer i.mutex.RUnlock()
1471-
1472-
if !i.open {
1473-
return ErrorIndexClosed
1474-
}
1475-
1476-
copyIndex, ok := i.i.(index.IndexFileCopyable)
1477-
if !ok {
1478-
return fmt.Errorf("index implementation does not support copy reader")
1479-
}
1480-
1481-
return copyIndex.CopyFile(file, d)
1482-
}
1483-
1484-
func (i *indexImpl) UpdateFileInBolt(key []byte, value []byte) error {
1485-
i.mutex.RLock()
1486-
defer i.mutex.RUnlock()
1487-
1488-
if !i.open {
1489-
return ErrorIndexClosed
1490-
}
1491-
1492-
copyIndex, ok := i.i.(index.IndexFileCopyable)
1493-
if !ok {
1494-
return fmt.Errorf("index implementation does not support file copy")
1495-
}
1496-
1497-
return copyIndex.UpdateFileInBolt(key, value)
1498-
}
1499-
15001468
func (f FileSystemDirectory) GetWriter(filePath string) (io.WriteCloser,
15011469
error,
15021470
) {

0 commit comments

Comments
 (0)