Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 112 additions & 22 deletions pkg/modelagent/gopher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package modelagent

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -324,31 +325,35 @@ func (s *Gopher) processTask(task *GopherTask) error {
s.logger.Errorf("Failed to get target directory path for model %s: %v", modelInfo, err)
return err
}
err = utils.Retry(s.downloadRetry, 100*time.Millisecond, func() error {
downloadErr := s.downloadModel(ctx, osUri, destPath, task)
if downloadErr != nil {
// Check if context was cancelled
if ctx.Err() != nil {
s.logger.Infof("Download cancelled for model %s: %v", modelInfo, ctx.Err())
return ctx.Err()
if matchedKey, reused := s.findReadyObjectStorageModelWithSamePath(ctx, task, baseModelSpec, destPath); reused {
s.logger.Infof("Reusing Ready same-path model artifact for %s/%s from %s at %s", namespace, name, matchedKey, destPath)
} else {
err = utils.Retry(s.downloadRetry, 100*time.Millisecond, func() error {
downloadErr := s.downloadModel(ctx, osUri, destPath, task)
if downloadErr != nil {
// Check if context was cancelled
if ctx.Err() != nil {
s.logger.Infof("Download cancelled for model %s: %v", modelInfo, ctx.Err())
return ctx.Err()
}
s.logger.Errorf("Failed to download model %s (attempt %d/%d): %v",
modelInfo, s.downloadRetry, s.downloadRetry, downloadErr)
}
s.logger.Errorf("Failed to download model %s (attempt %d/%d): %v",
modelInfo, s.downloadRetry, s.downloadRetry, downloadErr)
}
return downloadErr
})
if err != nil {
s.logger.Errorf("All download attempts failed for model %s: %v", modelInfo, err)
return downloadErr
})
if err != nil {
s.logger.Errorf("All download attempts failed for model %s: %v", modelInfo, err)

// Record download failure in metrics
errorType := "download_error"
if strings.Contains(err.Error(), "MD5") {
errorType = "md5_verification_error"
}
s.metrics.RecordFailedDownload(modelType, namespace, name, errorType)
// Record download failure in metrics
errorType := "download_error"
if strings.Contains(err.Error(), "MD5") {
errorType = "md5_verification_error"
}
s.metrics.RecordFailedDownload(modelType, namespace, name, errorType)

s.markModelOnNodeFailed(task)
return err
s.markModelOnNodeFailed(task)
return err
}
}
// Parse model config and update ConfigMap
// We can pass either BaseModel or ClusterBaseModel based on the task's model type
Expand Down Expand Up @@ -736,6 +741,91 @@ func (s *Gopher) createOCIOSDataStore(baseModelSpec v1beta1.BaseModelSpec) (*oci
return ociOSDS, nil
}

// findReadyObjectStorageModelWithSamePath looks for a Ready OCI Object Storage
// model entry on this node that resolves to the same local destination path.
// This is intentionally independent of downloadPolicy: copied model CRs with
// the same source and destination can reuse Ready local files, while normal
// downloadPolicy behavior still applies when no same-path Ready artifact exists.
func (s *Gopher) findReadyObjectStorageModelWithSamePath(ctx context.Context, task *GopherTask, baseModelSpec v1beta1.BaseModelSpec, destPath string) (string, bool) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment for this func

if task == nil || (task.BaseModel == nil && task.ClusterBaseModel == nil) {
return "", false
}
if baseModelSpec.Storage == nil || baseModelSpec.Storage.StorageUri == nil || baseModelSpec.Storage.Path == nil {
return "", false
}
if s.configMapReconciler == nil {
return "", false
}
if _, err := os.Stat(destPath); err != nil {
s.logger.Warnf("Cannot reuse same-path model artifact at %s because it is not available locally: %v", destPath, err)
return "", false
}

configMap, err := s.configMapReconciler.getConfigMap(ctx)
if err != nil {
s.logger.Warnf("Cannot inspect node ConfigMap for same-path model reuse: %v", err)
return "", false
}

currentKey := s.configMapReconciler.getModelConfigMapKey(task.BaseModel, task.ClusterBaseModel)
if s.clusterBaseModelLister != nil {
clusterBaseModels, err := s.clusterBaseModelLister.List(labels.Everything())
if err == nil {
for _, model := range clusterBaseModels {
key := constants.GetModelConfigMapKey("", model.Name, true)
if key != currentKey && isReadyModelEntry(configMap.Data[key]) &&
sameModelStoragePath(baseModelSpec.Storage, model.Spec.Storage, s.modelRootDir, destPath) {
return key, true
}
}
} else {
s.logger.Warnf("Cannot list ClusterBaseModels for same-path model reuse: %v", err)
}
}

if s.baseModelLister != nil {
baseModels, err := s.baseModelLister.List(labels.Everything())
if err == nil {
for _, model := range baseModels {
key := constants.GetModelConfigMapKey(model.Namespace, model.Name, false)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can extract reusable part between cluster basemodel and basemodel.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I kept the BaseModel and ClusterBaseModel lookup loops separate because that matches the existing repo pattern: they are different CR types with different listers and namespace/key semantics. The shared comparison logic is already extracted into sameModelStoragePath, so I’d avoid a broader refactor in this focused PR.

if key != currentKey && isReadyModelEntry(configMap.Data[key]) &&
sameModelStoragePath(baseModelSpec.Storage, model.Spec.Storage, s.modelRootDir, destPath) {
return key, true
}
}
} else {
s.logger.Warnf("Cannot list BaseModels for same-path model reuse: %v", err)
}
}
return "", false
}

func isReadyModelEntry(dataEntry string) bool {
var entry ModelEntry
if err := json.Unmarshal([]byte(dataEntry), &entry); err != nil {
return false
}
return entry.Status == ModelStatusReady
}

func sameModelStoragePath(currentStorage *v1beta1.StorageSpec, candidateStorage *v1beta1.StorageSpec, modelRootDir string, destPath string) bool {
if currentStorage == nil || candidateStorage == nil || currentStorage.Path == nil || candidateStorage.Path == nil {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to handle the case where the getDestPath is empty?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add.
getDestPath already falls back to modelRootDir + "/" + storageUri when storage.path is empty, and reuse still requires os.Stat(destPath) to pass.

return false
}
if !sameStringPtr(currentStorage.StorageUri, candidateStorage.StorageUri) ||
!sameStringPtr(currentStorage.Path, candidateStorage.Path) ||
!sameStringPtr(currentStorage.SchemaPath, candidateStorage.SchemaPath) ||
!sameStringPtr(currentStorage.StorageKey, candidateStorage.StorageKey) {
return false
}
if !sameStringMapPtr(currentStorage.Parameters, candidateStorage.Parameters) {
return false
}

candidateSpec := v1beta1.BaseModelSpec{Storage: candidateStorage}
return getDestPath(&candidateSpec, modelRootDir) == destPath
}

func (s *Gopher) downloadModel(ctx context.Context, uri *ociobjectstore.ObjectURI, destPath string, task *GopherTask) error {
startTime := time.Now()
defer func() {
Expand Down
93 changes: 93 additions & 0 deletions pkg/modelagent/gopher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"testing"

"go.uber.org/zap/zaptest"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -18,6 +21,7 @@ import (

"sigs.k8s.io/ome/pkg/apis/ome/v1beta1"
omev1beta1lister "sigs.k8s.io/ome/pkg/client/listers/ome/v1beta1"
"sigs.k8s.io/ome/pkg/constants"
"sigs.k8s.io/ome/pkg/utils/storage"
)

Expand Down Expand Up @@ -661,10 +665,99 @@ func entryJSON(sha, parentName string, parentPath string) string {
return string(b)
}

func modelEntryJSON(status ModelStatus) string {
entry := ModelEntry{Status: status}
b, err := json.Marshal(entry)
if err != nil {
return ""
}
return string(b)
}

func dp(v v1beta1.DownloadPolicy) *v1beta1.DownloadPolicy {
return &v
}

func TestFindReadyObjectStorageModelWithSamePath(t *testing.T) {
storageURI := "oci://n/object-ns/b/model-bucket/o/models/large-model"
modelPath := filepath.Join(t.TempDir(), "large-model")
require.NoError(t, os.MkdirAll(modelPath, 0755))
readyKey := constants.GetModelConfigMapKey("default", "large-model", false)
current := &v1beta1.BaseModel{
ObjectMeta: metav1.ObjectMeta{Name: "large-model", Namespace: "service-ns"},
Spec: v1beta1.BaseModelSpec{
Storage: &v1beta1.StorageSpec{
StorageUri: &storageURI,
Path: &modelPath,
},
},
}

tests := []struct {
name string
entryStatus ModelStatus
candidatePath string
destPath string
wantReuse bool
}{
{
name: "ready model with same path",
entryStatus: ModelStatusReady,
candidatePath: modelPath,
destPath: modelPath,
wantReuse: true,
},
{
name: "updating model with same path",
entryStatus: ModelStatusUpdating,
candidatePath: modelPath,
destPath: modelPath,
},
{
name: "ready model with different path",
entryStatus: ModelStatusReady,
candidatePath: filepath.Join(t.TempDir(), "other-model"),
destPath: modelPath,
},
{
name: "ready model missing local path",
entryStatus: ModelStatusReady,
candidatePath: filepath.Join(t.TempDir(), "missing-model"),
destPath: filepath.Join(t.TempDir(), "missing-model"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := newGopherWithConfigMap(makeConfigMap("node-1", map[string]string{
readyKey: modelEntryJSON(tt.entryStatus),
}))
g.baseModelLister = &mockBaseModelLister{
models: []*v1beta1.BaseModel{
{
ObjectMeta: metav1.ObjectMeta{Name: "large-model", Namespace: "default"},
Spec: v1beta1.BaseModelSpec{
Storage: &v1beta1.StorageSpec{
StorageUri: &storageURI,
Path: &tt.candidatePath,
},
},
},
},
}

matchedKey, reused := g.findReadyObjectStorageModelWithSamePath(context.Background(), &GopherTask{BaseModel: current}, current.Spec, tt.destPath)

assert.Equal(t, tt.wantReuse, reused)
if tt.wantReuse {
assert.Equal(t, readyKey, matchedKey)
} else {
assert.Empty(t, matchedKey)
}
})
}
}

func TestHandelReuseArtifactIfNecessary_NoReusePolicy(t *testing.T) {
nodeName := "node-1"
// Even if CM has content, when policy is AlwaysDownload we should not reuse.
Expand Down
23 changes: 23 additions & 0 deletions pkg/modelagent/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package modelagent

func sameStringPtr(left *string, right *string) bool {
if left == nil || right == nil {
return left == right
}
return *left == *right
}

func sameStringMapPtr(left *map[string]string, right *map[string]string) bool {
if left == nil || right == nil {
return left == right
}
if len(*left) != len(*right) {
return false
}
for key, leftValue := range *left {
if rightValue, ok := (*right)[key]; !ok || rightValue != leftValue {
return false
}
}
return true
}
63 changes: 63 additions & 0 deletions pkg/modelagent/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package modelagent

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSameStringPtr(t *testing.T) {
left := "value"
same := "value"
different := "different"

tests := []struct {
name string
left *string
right *string
want bool
}{
{name: "both nil", want: true},
{name: "left nil", right: &same},
{name: "right nil", left: &left},
{name: "same value", left: &left, right: &same, want: true},
{name: "different value", left: &left, right: &different},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, sameStringPtr(tt.left, tt.right))
})
}
}

func TestSameStringMapPtr(t *testing.T) {
left := map[string]string{"region": "us-phoenix-1", "auth": "InstancePrincipal"}
same := map[string]string{"auth": "InstancePrincipal", "region": "us-phoenix-1"}
differentValue := map[string]string{"region": "us-ashburn-1", "auth": "InstancePrincipal"}
missingKey := map[string]string{"region": "us-phoenix-1"}
emptyLeft := map[string]string{}
emptyRight := map[string]string{}

tests := []struct {
name string
left *map[string]string
right *map[string]string
want bool
}{
{name: "both nil", want: true},
{name: "left nil", right: &same},
{name: "right nil", left: &left},
{name: "same entries", left: &left, right: &same, want: true},
{name: "different value", left: &left, right: &differentValue},
{name: "missing key", left: &left, right: &missingKey},
{name: "both empty maps", left: &emptyLeft, right: &emptyRight, want: true},
{name: "nil and empty map", right: &emptyRight},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, sameStringMapPtr(tt.left, tt.right))
})
}
}
Loading