From 51dc2a79a006269e494063ad119a3f5ed0d42a0d Mon Sep 17 00:00:00 2001 From: Chengjie Yao Date: Wed, 20 May 2026 17:45:49 -0700 Subject: [PATCH 1/2] Skip object storage download for Ready same-path artifacts --- pkg/modelagent/gopher.go | 151 +++++++++++++++++++++++++++++----- pkg/modelagent/gopher_test.go | 93 +++++++++++++++++++++ 2 files changed, 222 insertions(+), 22 deletions(-) diff --git a/pkg/modelagent/gopher.go b/pkg/modelagent/gopher.go index 6f4c802e..2725107f 100644 --- a/pkg/modelagent/gopher.go +++ b/pkg/modelagent/gopher.go @@ -2,6 +2,7 @@ package modelagent import ( "context" + "encoding/json" "fmt" "os" "path/filepath" @@ -324,31 +325,35 @@ func (s *Gopher) processTask(task *GopherTask) error { s.logger.Errorf("Failed to get target directory path for model %s: %v", modelInfo, err) return err } - err = utils.Retry(s.downloadRetry, 100*time.Millisecond, func() error { - downloadErr := s.downloadModel(ctx, osUri, destPath, task) - if downloadErr != nil { - // Check if context was cancelled - if ctx.Err() != nil { - s.logger.Infof("Download cancelled for model %s: %v", modelInfo, ctx.Err()) - return ctx.Err() + if matchedKey, reused := s.findReadyObjectStorageModelWithSamePath(ctx, task, baseModelSpec, destPath); reused { + s.logger.Infof("Reusing Ready same-path model artifact for %s/%s from %s at %s", namespace, name, matchedKey, destPath) + } else { + err = utils.Retry(s.downloadRetry, 100*time.Millisecond, func() error { + downloadErr := s.downloadModel(ctx, osUri, destPath, task) + if downloadErr != nil { + // Check if context was cancelled + if ctx.Err() != nil { + s.logger.Infof("Download cancelled for model %s: %v", modelInfo, ctx.Err()) + return ctx.Err() + } + s.logger.Errorf("Failed to download model %s (attempt %d/%d): %v", + modelInfo, s.downloadRetry, s.downloadRetry, downloadErr) } - s.logger.Errorf("Failed to download model %s (attempt %d/%d): %v", - modelInfo, s.downloadRetry, s.downloadRetry, downloadErr) - } - return downloadErr - }) - if err != nil { - s.logger.Errorf("All download attempts failed for model %s: %v", modelInfo, err) + return downloadErr + }) + if err != nil { + s.logger.Errorf("All download attempts failed for model %s: %v", modelInfo, err) - // Record download failure in metrics - errorType := "download_error" - if strings.Contains(err.Error(), "MD5") { - errorType = "md5_verification_error" - } - s.metrics.RecordFailedDownload(modelType, namespace, name, errorType) + // Record download failure in metrics + errorType := "download_error" + if strings.Contains(err.Error(), "MD5") { + errorType = "md5_verification_error" + } + s.metrics.RecordFailedDownload(modelType, namespace, name, errorType) - s.markModelOnNodeFailed(task) - return err + s.markModelOnNodeFailed(task) + return err + } } // Parse model config and update ConfigMap // We can pass either BaseModel or ClusterBaseModel based on the task's model type @@ -736,6 +741,108 @@ func (s *Gopher) createOCIOSDataStore(baseModelSpec v1beta1.BaseModelSpec) (*oci return ociOSDS, nil } +func (s *Gopher) findReadyObjectStorageModelWithSamePath(ctx context.Context, task *GopherTask, baseModelSpec v1beta1.BaseModelSpec, destPath string) (string, bool) { + if task == nil || (task.BaseModel == nil && task.ClusterBaseModel == nil) { + return "", false + } + if baseModelSpec.Storage == nil || baseModelSpec.Storage.StorageUri == nil || baseModelSpec.Storage.Path == nil { + return "", false + } + if s.configMapReconciler == nil { + return "", false + } + if _, err := os.Stat(destPath); err != nil { + s.logger.Warnf("Cannot reuse same-path model artifact at %s because it is not available locally: %v", destPath, err) + return "", false + } + + configMap, err := s.configMapReconciler.getConfigMap(ctx) + if err != nil { + s.logger.Warnf("Cannot inspect node ConfigMap for same-path model reuse: %v", err) + return "", false + } + + currentKey := s.configMapReconciler.getModelConfigMapKey(task.BaseModel, task.ClusterBaseModel) + if s.clusterBaseModelLister != nil { + clusterBaseModels, err := s.clusterBaseModelLister.List(labels.Everything()) + if err == nil { + for _, model := range clusterBaseModels { + key := constants.GetModelConfigMapKey("", model.Name, true) + if key != currentKey && isReadyModelEntry(configMap.Data[key]) && + sameModelStoragePath(baseModelSpec.Storage, model.Spec.Storage, s.modelRootDir, destPath) { + return key, true + } + } + } else { + s.logger.Warnf("Cannot list ClusterBaseModels for same-path model reuse: %v", err) + } + } + + if s.baseModelLister != nil { + baseModels, err := s.baseModelLister.List(labels.Everything()) + if err == nil { + for _, model := range baseModels { + key := constants.GetModelConfigMapKey(model.Namespace, model.Name, false) + if key != currentKey && isReadyModelEntry(configMap.Data[key]) && + sameModelStoragePath(baseModelSpec.Storage, model.Spec.Storage, s.modelRootDir, destPath) { + return key, true + } + } + } else { + s.logger.Warnf("Cannot list BaseModels for same-path model reuse: %v", err) + } + } + return "", false +} + +func isReadyModelEntry(dataEntry string) bool { + var entry ModelEntry + if err := json.Unmarshal([]byte(dataEntry), &entry); err != nil { + return false + } + return entry.Status == ModelStatusReady +} + +func sameModelStoragePath(currentStorage *v1beta1.StorageSpec, candidateStorage *v1beta1.StorageSpec, modelRootDir string, destPath string) bool { + if currentStorage == nil || candidateStorage == nil || currentStorage.Path == nil || candidateStorage.Path == nil { + return false + } + if !sameStringPtr(currentStorage.StorageUri, candidateStorage.StorageUri) || + !sameStringPtr(currentStorage.Path, candidateStorage.Path) || + !sameStringPtr(currentStorage.SchemaPath, candidateStorage.SchemaPath) || + !sameStringPtr(currentStorage.StorageKey, candidateStorage.StorageKey) { + return false + } + if !sameStringMapPtr(currentStorage.Parameters, candidateStorage.Parameters) { + return false + } + + candidateSpec := v1beta1.BaseModelSpec{Storage: candidateStorage} + return getDestPath(&candidateSpec, modelRootDir) == destPath +} + +func sameStringPtr(left *string, right *string) bool { + if left == nil || right == nil { + return left == right + } + return *left == *right +} + +func sameStringMapPtr(left *map[string]string, right *map[string]string) bool { + if left == nil || right == nil { + return left == right + } + if len(*left) != len(*right) { + return false + } + for key, leftValue := range *left { + if rightValue, ok := (*right)[key]; !ok || rightValue != leftValue { + return false + } + } + return true +} + func (s *Gopher) downloadModel(ctx context.Context, uri *ociobjectstore.ObjectURI, destPath string, task *GopherTask) error { startTime := time.Now() defer func() { diff --git a/pkg/modelagent/gopher_test.go b/pkg/modelagent/gopher_test.go index 76604297..06f12a23 100644 --- a/pkg/modelagent/gopher_test.go +++ b/pkg/modelagent/gopher_test.go @@ -5,11 +5,14 @@ import ( "encoding/json" "errors" "fmt" + "os" + "path/filepath" "testing" "go.uber.org/zap/zaptest" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,6 +21,7 @@ import ( "sigs.k8s.io/ome/pkg/apis/ome/v1beta1" omev1beta1lister "sigs.k8s.io/ome/pkg/client/listers/ome/v1beta1" + "sigs.k8s.io/ome/pkg/constants" "sigs.k8s.io/ome/pkg/utils/storage" ) @@ -661,10 +665,99 @@ func entryJSON(sha, parentName string, parentPath string) string { return string(b) } +func modelEntryJSON(status ModelStatus) string { + entry := ModelEntry{Status: status} + b, err := json.Marshal(entry) + if err != nil { + return "" + } + return string(b) +} + func dp(v v1beta1.DownloadPolicy) *v1beta1.DownloadPolicy { return &v } +func TestFindReadyObjectStorageModelWithSamePath(t *testing.T) { + storageURI := "oci://n/object-ns/b/model-bucket/o/models/large-model" + modelPath := filepath.Join(t.TempDir(), "large-model") + require.NoError(t, os.MkdirAll(modelPath, 0755)) + readyKey := constants.GetModelConfigMapKey("default", "large-model", false) + current := &v1beta1.BaseModel{ + ObjectMeta: metav1.ObjectMeta{Name: "large-model", Namespace: "service-ns"}, + Spec: v1beta1.BaseModelSpec{ + Storage: &v1beta1.StorageSpec{ + StorageUri: &storageURI, + Path: &modelPath, + }, + }, + } + + tests := []struct { + name string + entryStatus ModelStatus + candidatePath string + destPath string + wantReuse bool + }{ + { + name: "ready model with same path", + entryStatus: ModelStatusReady, + candidatePath: modelPath, + destPath: modelPath, + wantReuse: true, + }, + { + name: "updating model with same path", + entryStatus: ModelStatusUpdating, + candidatePath: modelPath, + destPath: modelPath, + }, + { + name: "ready model with different path", + entryStatus: ModelStatusReady, + candidatePath: filepath.Join(t.TempDir(), "other-model"), + destPath: modelPath, + }, + { + name: "ready model missing local path", + entryStatus: ModelStatusReady, + candidatePath: filepath.Join(t.TempDir(), "missing-model"), + destPath: filepath.Join(t.TempDir(), "missing-model"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := newGopherWithConfigMap(makeConfigMap("node-1", map[string]string{ + readyKey: modelEntryJSON(tt.entryStatus), + })) + g.baseModelLister = &mockBaseModelLister{ + models: []*v1beta1.BaseModel{ + { + ObjectMeta: metav1.ObjectMeta{Name: "large-model", Namespace: "default"}, + Spec: v1beta1.BaseModelSpec{ + Storage: &v1beta1.StorageSpec{ + StorageUri: &storageURI, + Path: &tt.candidatePath, + }, + }, + }, + }, + } + + matchedKey, reused := g.findReadyObjectStorageModelWithSamePath(context.Background(), &GopherTask{BaseModel: current}, current.Spec, tt.destPath) + + assert.Equal(t, tt.wantReuse, reused) + if tt.wantReuse { + assert.Equal(t, readyKey, matchedKey) + } else { + assert.Empty(t, matchedKey) + } + }) + } +} + func TestHandelReuseArtifactIfNecessary_NoReusePolicy(t *testing.T) { nodeName := "node-1" // Even if CM has content, when policy is AlwaysDownload we should not reuse. From 475bd25b1e143758f8c887945b1497142aa749aa Mon Sep 17 00:00:00 2001 From: Chengjie Yao Date: Mon, 22 Jun 2026 19:33:17 -0700 Subject: [PATCH 2/2] Move string comparison helpers to modelagent utils and add comments --- pkg/modelagent/gopher.go | 27 +++------------- pkg/modelagent/utils.go | 23 +++++++++++++ pkg/modelagent/utils_test.go | 63 ++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 22 deletions(-) create mode 100644 pkg/modelagent/utils.go create mode 100644 pkg/modelagent/utils_test.go diff --git a/pkg/modelagent/gopher.go b/pkg/modelagent/gopher.go index 2725107f..9b757b46 100644 --- a/pkg/modelagent/gopher.go +++ b/pkg/modelagent/gopher.go @@ -741,6 +741,11 @@ func (s *Gopher) createOCIOSDataStore(baseModelSpec v1beta1.BaseModelSpec) (*oci return ociOSDS, nil } +// findReadyObjectStorageModelWithSamePath looks for a Ready OCI Object Storage +// model entry on this node that resolves to the same local destination path. +// This is intentionally independent of downloadPolicy: copied model CRs with +// the same source and destination can reuse Ready local files, while normal +// downloadPolicy behavior still applies when no same-path Ready artifact exists. func (s *Gopher) findReadyObjectStorageModelWithSamePath(ctx context.Context, task *GopherTask, baseModelSpec v1beta1.BaseModelSpec, destPath string) (string, bool) { if task == nil || (task.BaseModel == nil && task.ClusterBaseModel == nil) { return "", false @@ -821,28 +826,6 @@ func sameModelStoragePath(currentStorage *v1beta1.StorageSpec, candidateStorage return getDestPath(&candidateSpec, modelRootDir) == destPath } -func sameStringPtr(left *string, right *string) bool { - if left == nil || right == nil { - return left == right - } - return *left == *right -} - -func sameStringMapPtr(left *map[string]string, right *map[string]string) bool { - if left == nil || right == nil { - return left == right - } - if len(*left) != len(*right) { - return false - } - for key, leftValue := range *left { - if rightValue, ok := (*right)[key]; !ok || rightValue != leftValue { - return false - } - } - return true -} - func (s *Gopher) downloadModel(ctx context.Context, uri *ociobjectstore.ObjectURI, destPath string, task *GopherTask) error { startTime := time.Now() defer func() { diff --git a/pkg/modelagent/utils.go b/pkg/modelagent/utils.go new file mode 100644 index 00000000..3b873038 --- /dev/null +++ b/pkg/modelagent/utils.go @@ -0,0 +1,23 @@ +package modelagent + +func sameStringPtr(left *string, right *string) bool { + if left == nil || right == nil { + return left == right + } + return *left == *right +} + +func sameStringMapPtr(left *map[string]string, right *map[string]string) bool { + if left == nil || right == nil { + return left == right + } + if len(*left) != len(*right) { + return false + } + for key, leftValue := range *left { + if rightValue, ok := (*right)[key]; !ok || rightValue != leftValue { + return false + } + } + return true +} diff --git a/pkg/modelagent/utils_test.go b/pkg/modelagent/utils_test.go new file mode 100644 index 00000000..8d3aec6d --- /dev/null +++ b/pkg/modelagent/utils_test.go @@ -0,0 +1,63 @@ +package modelagent + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSameStringPtr(t *testing.T) { + left := "value" + same := "value" + different := "different" + + tests := []struct { + name string + left *string + right *string + want bool + }{ + {name: "both nil", want: true}, + {name: "left nil", right: &same}, + {name: "right nil", left: &left}, + {name: "same value", left: &left, right: &same, want: true}, + {name: "different value", left: &left, right: &different}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, sameStringPtr(tt.left, tt.right)) + }) + } +} + +func TestSameStringMapPtr(t *testing.T) { + left := map[string]string{"region": "us-phoenix-1", "auth": "InstancePrincipal"} + same := map[string]string{"auth": "InstancePrincipal", "region": "us-phoenix-1"} + differentValue := map[string]string{"region": "us-ashburn-1", "auth": "InstancePrincipal"} + missingKey := map[string]string{"region": "us-phoenix-1"} + emptyLeft := map[string]string{} + emptyRight := map[string]string{} + + tests := []struct { + name string + left *map[string]string + right *map[string]string + want bool + }{ + {name: "both nil", want: true}, + {name: "left nil", right: &same}, + {name: "right nil", left: &left}, + {name: "same entries", left: &left, right: &same, want: true}, + {name: "different value", left: &left, right: &differentValue}, + {name: "missing key", left: &left, right: &missingKey}, + {name: "both empty maps", left: &emptyLeft, right: &emptyRight, want: true}, + {name: "nil and empty map", right: &emptyRight}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, sameStringMapPtr(tt.left, tt.right)) + }) + } +}