diff --git a/bundle/artifacts/artifacts.go b/bundle/artifacts/artifacts.go index a5f41ae4bb5..15565cd60bb 100644 --- a/bundle/artifacts/artifacts.go +++ b/bundle/artifacts/artifacts.go @@ -8,6 +8,7 @@ import ( "os" "path" "path/filepath" + "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/artifacts/whl" @@ -17,6 +18,7 @@ import ( "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go" ) type mutatorFactory = func(name string) bundle.Mutator @@ -103,7 +105,7 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost return diag.FromErr(err) } - client, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), uploadPath) + client, err := getFilerForArtifacts(b.WorkspaceClient(), uploadPath) if err != nil { return diag.FromErr(err) } @@ -116,6 +118,17 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost return nil } +func getFilerForArtifacts(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) { + if isVolumesPath(uploadPath) { + return filer.NewFilesClient(w, uploadPath) + } + return filer.NewWorkspaceFilesClient(w, uploadPath) +} + +func isVolumesPath(path string) bool { + return strings.HasPrefix(path, "/Volumes/") +} + func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error { for i := range a.Files { f := &a.Files[i] @@ -130,14 +143,15 @@ func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, u log.Infof(ctx, "Upload succeeded") f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source)) + remotePath := f.RemotePath - // TODO: confirm if we still need to update the remote path to start with /Workspace - wsfsBase := "/Workspace" - remotePath := path.Join(wsfsBase, f.RemotePath) + if !strings.HasPrefix(f.RemotePath, "/Workspace/") && !strings.HasPrefix(f.RemotePath, "/Volumes/") { + wsfsBase := "/Workspace" + remotePath = path.Join(wsfsBase, f.RemotePath) + } for _, job := range b.Config.Resources.Jobs { rewriteArtifactPath(b, f, job, remotePath) - } } diff --git a/bundle/artifacts/artifacts_test.go b/bundle/artifacts/artifacts_test.go index 53c2798ede4..6d85f3af908 100644 --- a/bundle/artifacts/artifacts_test.go +++ b/bundle/artifacts/artifacts_test.go @@ -17,7 +17,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestArtifactUpload(t *testing.T) { +func TestArtifactUploadForWorkspace(t *testing.T) { tmpDir := t.TempDir() whlFolder := filepath.Join(tmpDir, "whl") testutil.Touch(t, whlFolder, "source.whl") @@ -105,3 +105,92 @@ func TestArtifactUpload(t *testing.T) { require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl) require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl) } + +func TestArtifactUploadForVolumes(t *testing.T) { + tmpDir := t.TempDir() + whlFolder := filepath.Join(tmpDir, "whl") + testutil.Touch(t, whlFolder, "source.whl") + whlLocalPath := filepath.Join(whlFolder, "source.whl") + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/foo/bar/artifacts", + }, + Artifacts: config.Artifacts{ + "whl": { + Type: config.ArtifactPythonWheel, + Files: []config.ArtifactFile{ + {Source: whlLocalPath}, + }, + }, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + Libraries: []compute.Library{ + { + Whl: filepath.Join("whl", "*.whl"), + }, + { + Whl: "/Volumes/some/path/mywheel.whl", + }, + }, + }, + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + Libraries: []compute.Library{ + { + Whl: filepath.Join("whl", "*.whl"), + }, + { + Whl: "/Volumes/some/path/mywheel.whl", + }, + }, + }, + }, + }, + }, + Environments: []jobs.JobEnvironment{ + { + Spec: &compute.Environment{ + Dependencies: []string{ + filepath.Join("whl", "source.whl"), + "/Volumes/some/path/mywheel.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + artifact := b.Config.Artifacts["whl"] + mockFiler := mockfiler.NewMockFiler(t) + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("source.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil) + + err := uploadArtifact(context.Background(), b, artifact, "/Volumes/foo/bar/artifacts", mockFiler) + require.NoError(t, err) + + // Test that libraries path is updated + require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl) + require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl) + require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0]) + require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1]) + require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl) + require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl) +} diff --git a/bundle/artifacts/build.go b/bundle/artifacts/build.go index 722891ada16..c8c3bf67ca0 100644 --- a/bundle/artifacts/build.go +++ b/bundle/artifacts/build.go @@ -44,27 +44,6 @@ func (m *build) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { } } - // Expand any glob reference in files source path - files := make([]config.ArtifactFile, 0, len(artifact.Files)) - for _, f := range artifact.Files { - matches, err := filepath.Glob(f.Source) - if err != nil { - return diag.Errorf("unable to find files for %s: %v", f.Source, err) - } - - if len(matches) == 0 { - return diag.Errorf("no files found for %s", f.Source) - } - - for _, match := range matches { - files = append(files, config.ArtifactFile{ - Source: match, - }) - } - } - - artifact.Files = files - // Skip building if build command is not specified or infered if artifact.BuildCommand == "" { // If no build command was specified or infered and there is no @@ -72,7 +51,11 @@ func (m *build) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { if len(artifact.Files) == 0 { return diag.Errorf("misconfigured artifact: please specify 'build' or 'files' property") } - return nil + + // We can skip calling build mutator if there is no build command + // But we still need to expand glob references in files source path. + diags := expandGlobReference(artifact) + return diags } // If artifact path is not provided, use bundle root dir @@ -85,5 +68,40 @@ func (m *build) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { artifact.Path = filepath.Join(dirPath, artifact.Path) } - return bundle.Apply(ctx, b, getBuildMutator(artifact.Type, m.name)) + diags := bundle.Apply(ctx, b, getBuildMutator(artifact.Type, m.name)) + if diags.HasError() { + return diags + } + + // We need to expand glob reference after build mutator is applied because + // if we do it before, any files that are generated by build command will + // not be included into artifact.Files and thus will not be uploaded. + d := expandGlobReference(artifact) + return diags.Extend(d) +} + +func expandGlobReference(artifact *config.Artifact) diag.Diagnostics { + var diags diag.Diagnostics + + // Expand any glob reference in files source path + files := make([]config.ArtifactFile, 0, len(artifact.Files)) + for _, f := range artifact.Files { + matches, err := filepath.Glob(f.Source) + if err != nil { + return diags.Extend(diag.Errorf("unable to find files for %s: %v", f.Source, err)) + } + + if len(matches) == 0 { + return diags.Extend(diag.Errorf("no files found for %s", f.Source)) + } + + for _, match := range matches { + files = append(files, config.ArtifactFile{ + Source: match, + }) + } + } + + artifact.Files = files + return diags } diff --git a/bundle/artifacts/upload.go b/bundle/artifacts/upload.go index 5c12c944497..3af50021e83 100644 --- a/bundle/artifacts/upload.go +++ b/bundle/artifacts/upload.go @@ -6,7 +6,8 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/diag" - "github.com/databricks/databricks-sdk-go/service/workspace" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" ) func UploadAll() bundle.Mutator { @@ -57,12 +58,18 @@ func (m *cleanUp) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics return diag.FromErr(err) } - b.WorkspaceClient().Workspace.Delete(ctx, workspace.Delete{ - Path: uploadPath, - Recursive: true, - }) + client, err := getFilerForArtifacts(b.WorkspaceClient(), uploadPath) + if err != nil { + return diag.FromErr(err) + } + + // We intentionally ignore the error because it is not critical to the deployment + err = client.Delete(ctx, ".", filer.DeleteRecursively) + if err != nil { + log.Errorf(ctx, "failed to delete %s: %v", uploadPath, err) + } - err = b.WorkspaceClient().Workspace.MkdirsByPath(ctx, uploadPath) + err = client.Mkdir(ctx, ".") if err != nil { return diag.Errorf("unable to create directory for %s: %v", uploadPath, err) } diff --git a/internal/bundle/artifacts_test.go b/internal/bundle/artifacts_test.go index 222b2304743..46c236a4e95 100644 --- a/internal/bundle/artifacts_test.go +++ b/internal/bundle/artifacts_test.go @@ -153,3 +153,72 @@ func TestAccUploadArtifactFileToCorrectRemotePathWithEnvironments(t *testing.T) b.Config.Resources.Jobs["test"].JobSettings.Environments[0].Spec.Dependencies[0], ) } + +func TestAccUploadArtifactFileToCorrectRemotePathForVolumes(t *testing.T) { + ctx, wt := acc.WorkspaceTest(t) + w := wt.W + + if os.Getenv("TEST_METASTORE_ID") == "" { + t.Skip("Skipping tests that require a UC Volume when metastore id is not set.") + } + + volumePath := internal.TemporaryUcVolume(t, w) + + dir := t.TempDir() + whlPath := filepath.Join(dir, "dist", "test.whl") + touchEmptyFile(t, whlPath) + + b := &bundle.Bundle{ + RootPath: dir, + Config: config.Root{ + Bundle: config.Bundle{ + Target: "whatever", + }, + Workspace: config.Workspace{ + ArtifactPath: volumePath, + }, + Artifacts: config.Artifacts{ + "test": &config.Artifact{ + Type: "whl", + Files: []config.ArtifactFile{ + { + Source: whlPath, + }, + }, + }, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "test": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + Libraries: []compute.Library{ + { + Whl: "dist/test.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test")) + require.NoError(t, diags.Error()) + + // The remote path attribute on the artifact file should have been set. + require.Regexp(t, + regexp.MustCompile(path.Join(regexp.QuoteMeta(volumePath), `.internal/test\.whl`)), + b.Config.Artifacts["test"].Files[0].RemotePath, + ) + + // The task library path should have been updated to the remote path. + require.Regexp(t, + regexp.MustCompile(path.Join(regexp.QuoteMeta(volumePath), `.internal/test\.whl`)), + b.Config.Resources.Jobs["test"].JobSettings.Tasks[0].Libraries[0].Whl, + ) +} diff --git a/internal/bundle/bundles/spark_jar_task/databricks_template_schema.json b/internal/bundle/bundles/spark_jar_task/databricks_template_schema.json new file mode 100644 index 00000000000..078dff976cf --- /dev/null +++ b/internal/bundle/bundles/spark_jar_task/databricks_template_schema.json @@ -0,0 +1,29 @@ +{ + "properties": { + "project_name": { + "type": "string", + "default": "my_java_project", + "description": "Unique name for this project" + }, + "spark_version": { + "type": "string", + "description": "Spark version used for job cluster" + }, + "node_type_id": { + "type": "string", + "description": "Node type id for job cluster" + }, + "unique_id": { + "type": "string", + "description": "Unique ID for job name" + }, + "root": { + "type": "string", + "description": "Path to the root of the template" + }, + "artifact_path": { + "type": "string", + "description": "Path to the remote base path for artifacts" + } + } +} diff --git a/internal/bundle/bundles/spark_jar_task/template/databricks.yml.tmpl b/internal/bundle/bundles/spark_jar_task/template/databricks.yml.tmpl new file mode 100644 index 00000000000..24a6d7d8a9b --- /dev/null +++ b/internal/bundle/bundles/spark_jar_task/template/databricks.yml.tmpl @@ -0,0 +1,28 @@ +bundle: + name: spark-jar-task + +workspace: + root_path: "~/.bundle/{{.unique_id}}" + artifact_path: {{.artifact_path}} + +artifacts: + my_java_code: + path: ./{{.project_name}} + build: "javac PrintArgs.java && jar cvfm PrintArgs.jar META-INF/MANIFEST.MF PrintArgs.class" + files: + - source: ./{{.project_name}}/PrintArgs.jar + +resources: + jobs: + jar_job: + name: "[${bundle.target}] Test Spark Jar Job {{.unique_id}}" + tasks: + - task_key: TestSparkJarTask + new_cluster: + num_workers: 1 + spark_version: "{{.spark_version}}" + node_type_id: "{{.node_type_id}}" + spark_jar_task: + main_class_name: PrintArgs + libraries: + - jar: ./{{.project_name}}/PrintArgs.jar diff --git a/internal/bundle/bundles/spark_jar_task/template/{{.project_name}}/META-INF/MANIFEST.MF b/internal/bundle/bundles/spark_jar_task/template/{{.project_name}}/META-INF/MANIFEST.MF new file mode 100644 index 00000000000..40b023dbd60 --- /dev/null +++ b/internal/bundle/bundles/spark_jar_task/template/{{.project_name}}/META-INF/MANIFEST.MF @@ -0,0 +1 @@ +Main-Class: PrintArgs \ No newline at end of file diff --git a/internal/bundle/bundles/spark_jar_task/template/{{.project_name}}/PrintArgs.java b/internal/bundle/bundles/spark_jar_task/template/{{.project_name}}/PrintArgs.java new file mode 100644 index 00000000000..b7430f25f37 --- /dev/null +++ b/internal/bundle/bundles/spark_jar_task/template/{{.project_name}}/PrintArgs.java @@ -0,0 +1,8 @@ +import java.util.Arrays; + +public class PrintArgs { + public static void main(String[] args) { + System.out.println("Hello from Jar!"); + System.out.println(Arrays.toString(args)); + } +} diff --git a/internal/bundle/helpers.go b/internal/bundle/helpers.go index a17964b167f..c33c1533132 100644 --- a/internal/bundle/helpers.go +++ b/internal/bundle/helpers.go @@ -21,9 +21,13 @@ import ( const defaultSparkVersion = "13.3.x-snapshot-scala2.12" func initTestTemplate(t *testing.T, ctx context.Context, templateName string, config map[string]any) (string, error) { + bundleRoot := t.TempDir() + return initTestTemplateWithBundleRoot(t, ctx, templateName, config, bundleRoot) +} + +func initTestTemplateWithBundleRoot(t *testing.T, ctx context.Context, templateName string, config map[string]any, bundleRoot string) (string, error) { templateRoot := filepath.Join("bundles", templateName) - bundleRoot := t.TempDir() configFilePath, err := writeConfigFile(t, config) if err != nil { return "", err diff --git a/internal/bundle/spark_jar_test.go b/internal/bundle/spark_jar_test.go new file mode 100644 index 00000000000..c981e775044 --- /dev/null +++ b/internal/bundle/spark_jar_test.go @@ -0,0 +1,52 @@ +package bundle + +import ( + "os" + "testing" + + "github.com/databricks/cli/internal" + "github.com/databricks/cli/internal/acc" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func runSparkJarTest(t *testing.T, sparkVersion string) { + t.Skip("Temporarily skipping the test until auth / permission issues for UC volumes are resolved.") + + env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV") + t.Log(env) + + if os.Getenv("TEST_METASTORE_ID") == "" { + t.Skip("Skipping tests that require a UC Volume when metastore id is not set.") + } + + ctx, wt := acc.WorkspaceTest(t) + w := wt.W + volumePath := internal.TemporaryUcVolume(t, w) + + nodeTypeId := internal.GetNodeTypeId(env) + tmpDir := t.TempDir() + bundleRoot, err := initTestTemplateWithBundleRoot(t, ctx, "spark_jar_task", map[string]any{ + "node_type_id": nodeTypeId, + "unique_id": uuid.New().String(), + "spark_version": sparkVersion, + "root": tmpDir, + "artifact_path": volumePath, + }, tmpDir) + require.NoError(t, err) + + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + t.Cleanup(func() { + destroyBundle(t, ctx, bundleRoot) + }) + + out, err := runResource(t, ctx, bundleRoot, "jar_job") + require.NoError(t, err) + require.Contains(t, out, "Hello from Jar!") +} + +func TestAccSparkJarTaskDeployAndRunOnVolumes(t *testing.T) { + runSparkJarTest(t, "14.3.x-scala2.12") +} diff --git a/internal/helpers.go b/internal/helpers.go index 67a258ba407..972a2322b52 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -472,7 +472,7 @@ func TemporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { } // Create a new UC volume in a catalog called "main" in the workspace. -func temporaryUcVolume(t *testing.T, w *databricks.WorkspaceClient) string { +func TemporaryUcVolume(t *testing.T, w *databricks.WorkspaceClient) string { ctx := context.Background() // Create a schema @@ -607,7 +607,7 @@ func setupUcVolumesFiler(t *testing.T) (filer.Filer, string) { w, err := databricks.NewWorkspaceClient() require.NoError(t, err) - tmpDir := temporaryUcVolume(t, w) + tmpDir := TemporaryUcVolume(t, w) f, err := filer.NewFilesClient(w, tmpDir) require.NoError(t, err)