Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The Databricks SDK for Go includes functionality to accelerate development with
- [Paginated responses](#paginated-responses)
- [GetByName utility methods](#getbyname-utility-methods)
- [Node type and Databricks Runtime selectors](#node-type-and-databricks-runtime-selectors)
- [io.Reader integration for DBFS](#ioreader-integration-for-dbfs)
- [Integration with `io` interfaces for DBFS](#integration-with-io-interfaces-for-dbfs)
- [Logging](#logging)
- [Interface stability](#interface-stability)

Expand Down Expand Up @@ -477,17 +477,44 @@ runningCluster, err := w.Clusters.CreateAndWait(ctx, clusters.CreateCluster{
})
```

## `io.Reader` integration for DBFS
## Integration with `io` interfaces for DBFS
Comment thread
pietern marked this conversation as resolved.

Use the higher-level `w.Dbfs.Open` and `w.Dbfs.Overwrite` methods to work with remote files through the `io.Reader` interface. Internally, these methods wrap the low-level intricacies of working with Databricks REST APIs, providing a convenient interface to you as a developer.
You can open a file on DBFS for reading or writing with `w.Dbfs.Open`.
This function returns a `dbfs.Handle` that is compatible with a subset of `io`
interfaces for reading, writing, and closing.

Uploading a file from an `io.Reader`:

```go
upload, _ := os.Open("/path/to/local/file.ext")
_ = w.Dbfs.Overwrite(ctx, "/path/to/remote/file", upload)
remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite|dbfs.FileModeOverwrite)
_, _ = io.Copy(remote, upload)
_ = remote.Close()
```

Downloading a file to an `io.Writer`:

```go
download, _ := os.Create("/path/to/local")
remote, _ := w.Dbfs.Open(ctx, "/path/to/remote")
_ = io.Copy(download, remote)
remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeRead)
_, _ = io.Copy(download, remote)
```

### Reading into and writing from buffers

You can read from or write to a DBFS file directly from a byte slice through
the convenience functions `w.Dbfs.ReadFile` and `w.Dbfs.WriteFile`.

Uploading a file from a byte slice:

```go
err := w.Dbfs.WriteFile(ctx, "/path/to/remote/file", []byte("Hello world!"))
```

Downloading a file into a byte slice:

```go
buf, err := w.Dbfs.ReadFile(ctx, "/path/to/remote/file")
```

## `pflag.Value` for enums
Expand Down
109 changes: 89 additions & 20 deletions internal/dbfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ import (
"github.com/stretchr/testify/require"
)

func TestAccDbfsUtilities(t *testing.T) {
type hashable []byte

func (buf hashable) Hash() uint32 {
h := fnv.New32a()
h.Write(buf)
return h.Sum32()
}

func TestAccDbfsOpen(t *testing.T) {
ctx, w := workspaceTest(t)
if w.Config.IsGcp() {
t.Skip("dbfs not available on gcp")
Expand All @@ -26,47 +34,108 @@ func TestAccDbfsUtilities(t *testing.T) {
rand.Seed(time.Now().UnixNano())
in := make([]byte, 1.44*1e6)
_, _ = rand.Read(in)
h := fnv.New32a()
h.Write(in)
inHash := h.Sum32()

err := w.Dbfs.Overwrite(ctx, path, bytes.NewReader(in))
require.NoError(t, err)

defer w.Dbfs.Delete(ctx, dbfs.Delete{
Path: path,
})

// Download directly [io.Reader] and let [io.ReadAll] determine buffer size.
// Upload through [io.Writer].
{
Comment thread
nfx marked this conversation as resolved.
handle, err := w.Dbfs.Open(ctx, path, dbfs.FileModeWrite)
require.NoError(t, err)
n, err := handle.Write(in)
require.NoError(t, err)
assert.Equal(t, len(in), int(n))
require.NoError(t, handle.Close())

// Verify contents hash.
out, err := w.Dbfs.ReadFile(ctx, path)
require.NoError(t, err)
assert.Equal(t, hashable(in).Hash(), hashable(out).Hash())
}

// Upload through [io.Writer] should fail because the file exists.
{
_, err := w.Dbfs.Open(ctx, path, dbfs.FileModeWrite)
require.ErrorContains(t, err, "dbfs open: A file or directory already exists at the input path")
}

// Upload through [io.ReadFrom] with overwrite bit set.
{
handle, err := w.Dbfs.Open(ctx, path, dbfs.FileModeWrite|dbfs.FileModeOverwrite)
require.NoError(t, err)
n, err := handle.ReadFrom(bytes.NewReader(in))
require.NoError(t, err)
assert.Equal(t, len(in), int(n))
require.NoError(t, handle.Close())

// Verify contents hash.
out, err := w.Dbfs.ReadFile(ctx, path)
require.NoError(t, err)
assert.Equal(t, hashable(in).Hash(), hashable(out).Hash())
}

// Download through [io.Reader] and let [io.ReadAll] determine buffer size.
{
dbfsReader, err := w.Dbfs.Open(ctx, path)
handle, err := w.Dbfs.Open(ctx, path, dbfs.FileModeRead)
require.NoError(t, err)

// Note: [io.ReadAll] always calls into the [io.Reader] interface.
out, err := io.ReadAll(dbfsReader)
out, err := io.ReadAll(handle)
require.NoError(t, err)

// Verify contents hash.
h := fnv.New32a()
h.Write(out)
require.Equal(t, inHash, h.Sum32())
assert.Equal(t, hashable(in).Hash(), hashable(out).Hash())
}

// Download through [io.WriterTo] with maximum buffer size.
// Download through [io.WriterTo].
{
dbfsReader, err := w.Dbfs.Open(ctx, path)
handle, err := w.Dbfs.Open(ctx, path, dbfs.FileModeRead)
require.NoError(t, err)

// Note: [io.Copy] leverages the [io.WriterTo] interface if available.
var buf bytes.Buffer
_, err = io.Copy(&buf, dbfsReader)
_, err = handle.WriteTo(&buf)
require.NoError(t, err)

// Verify contents hash.
h := fnv.New32a()
h.Write(buf.Bytes())
require.Equal(t, inHash, h.Sum32())
assert.Equal(t, hashable(in).Hash(), hashable(buf.Bytes()).Hash())
}
}

func TestAccDbfsReadFileWriteFile(t *testing.T) {
ctx, w := workspaceTest(t)
if w.Config.IsGcp() {
t.Skip("dbfs not available on gcp")
}

path := RandomName("/tmp/.sdk/fake")
rand.Seed(time.Now().UnixNano())
in := make([]byte, 1.44*1e6)
_, _ = rand.Read(in)

defer w.Dbfs.Delete(ctx, dbfs.Delete{
Path: path,
})

// Write file to DBFS.
err := w.Dbfs.WriteFile(ctx, path, in)
require.NoError(t, err)

// Verify contents hash.
out, err := w.Dbfs.ReadFile(ctx, path)
require.NoError(t, err)
assert.Equal(t, hashable(in).Hash(), hashable(out).Hash())

hello := []byte("Hello world!")

// Writing to the same path should truncate the existing file.
err = w.Dbfs.WriteFile(ctx, path, hello)
require.NoError(t, err)

// Verify contents hash.
out, err = w.Dbfs.ReadFile(ctx, path)
require.NoError(t, err)
assert.Equal(t, hashable(hello).Hash(), hashable(out).Hash())
}

func TestAccListDbfsIntegration(t *testing.T) {
Expand Down
46 changes: 31 additions & 15 deletions service/dbfs/doc.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,49 @@
// Databricks FileSystem (DBFS) API
// Databricks File System (DBFS) API
//
// We strongly recommend using clients created via
// [github.com/databricks/databricks-sdk-go/workspaces.New] to simplify
// configuration experience.
// We recommend using a client created via [databricks.NewWorkspaceClient]
// to simplify the configuration experience.
//
// Please use the high-level [DbfsAPI.Open] and [DbfsAPI.Overwrite] methods
// to work with remote files through Go's [io] interfaces. The return value
// of [DbfsAPI.Open] implements the [io.Reader] and [io.WriterTo] interfaces.
// The [io.WriterTo] interface is used by [io.Copy] and maximizes throughput by
// reading data with the DBFS maximum read chunk size of 1MB.
// # Reading and writing files
//
// Internally, these methods wrap the low level [DbfsAPI.Create],
// [DbfsAPI.Close], [DbfsAPI.Read], and [DbfsAPI.AddBlock] methods:
// You can open a file on DBFS for reading or writing with [DbfsAPI.Open].
// This function returns a [Handle] that is compatible with a subset of [io]
// interfaces for reading, writing, and closing.
//
// Uploading a file from an [io.Reader]:
//
// upload, _ := os.Open("/path/to/local/file.ext")
// _ = w.Dbfs.Overwrite(ctx, "/path/to/remote/file", upload)
// remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite|dbfs.FileModeOverwrite)
// io.Copy(remote, upload)
// remote.Close()
//
// Downloading a file to an [io.Writer]:
//
// download, _ := os.Create("/path/to/local")
// remote, _ := w.Dbfs.Open(ctx, "/path/to/remote")
// remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeRead)
// _ = io.Copy(download, remote)
//
// Moving files:
// # Reading and writing files from buffers
//
// You can read from or write to a DBFS file directly from a byte slice through
// the convenience functions [DbfsAPI.ReadFile] and [DbfsAPI.WriteFile].
//
// Uploading a file from a byte slice:
//
// buf := []byte("Hello world!")
// _ = w.Dbfs.WriteFile(ctx, "/path/to/remote/file", buf)
//
// Downloading a file into a byte slice:
//
// buf, err := w.Dbfs.ReadFile(ctx, "/path/to/remote/file")
//
// # Moving files
//
// err := w.Dbfs.Move(ctx, dbfs.Move{
// SourcePath: "/remote/src/path",
// DestinationPath: "/remote/dst/path",
// })
//
// Creating directories:
// # Creating directories
//
// w.Dbfs.MkdirsByPath(ctx, "/remote/dir/path")
package dbfs
Loading