From effffb6f7d1ab27a8936ac273c089495850a3206 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 19 Dec 2022 15:14:30 +0100 Subject: [PATCH 01/11] DBFS file handle that supports both reading and writing Follow up to #249. --- service/dbfs/handle.go | 225 ++++++++++++++++++++++++++++++++++++++ service/dbfs/utilities.go | 112 ++----------------- 2 files changed, 237 insertions(+), 100 deletions(-) create mode 100644 service/dbfs/handle.go diff --git a/service/dbfs/handle.go b/service/dbfs/handle.go new file mode 100644 index 000000000..aa90c4cd8 --- /dev/null +++ b/service/dbfs/handle.go @@ -0,0 +1,225 @@ +package dbfs + +import ( + "bytes" + "context" + "encoding/base64" + "fmt" + "io" + + "github.com/databricks/databricks-sdk-go/useragent" +) + +// DbfsFileMode conveys user intent when opening a file. +type DbfsFileMode int + +const ( + // Exactly one of DbfsRead or DbfsWrite must be specified. + DbfsRead DbfsFileMode = 1 << iota + DbfsWrite + DbfsOverwrite +) + +// Maximum read or write length for the DBFS API. +const maxDbfsBlockSize = 1024 * 1024 + +type dbfsReader struct { + size int64 + offset int64 +} + +type dbfsWriter struct { + handle int64 +} + +type dbfsHandle struct { + ctx context.Context + api *DbfsAPI + path string + + *dbfsReader + *dbfsWriter +} + +// Implement the [io.Reader] interface. +func (h *dbfsHandle) Read(p []byte) (int, error) { + r := h.dbfsReader + if r == nil { + return 0, fmt.Errorf("dbfs file not open for reading") + } + + if r.offset >= r.size { + return 0, io.EOF + } + + res, err := h.api.Read(h.ctx, Read{ + Path: h.path, + Length: len(p), + Offset: int(r.offset), // TODO: make int32/in64 work properly + }) + if err != nil { + return 0, fmt.Errorf("dbfs read: %w", err) + } + + // The guard against offset >= size happens above, so this can only happen + // if the file is modified or truncated while reading. If this happens, + // the read contents will likely be corrupted, so we return an error. + if res.BytesRead == 0 { + return 0, fmt.Errorf("dbfs read: unexpected EOF at offset %d (size %d)", r.offset, r.size) + } + + r.offset += res.BytesRead + return base64.StdEncoding.Decode(p, []byte(res.Data)) +} + +// Implement the [io.WriterTo] interface. +func (h *dbfsHandle) WriteTo(w io.Writer) (int64, error) { + r := h.dbfsReader + if r == nil { + return 0, fmt.Errorf("dbfs file not open for reading") + } + + buf := make([]byte, maxDbfsBlockSize) + ntotal := int64(0) + for { + nread, err := h.Read(buf) + if err != nil { + // EOF on read means we're done. + // For writers being done means returning a nil error. + if err == io.EOF { + err = nil + } + return ntotal, err + } + nwritten, err := io.Copy(w, bytes.NewReader(buf[:nread])) + ntotal += nwritten + if err != nil { + return ntotal, err + } + } +} + +// Implement the [io.Writer] interface. +func (h *dbfsHandle) Write(p []byte) (int, error) { + w := h.dbfsWriter + if w == nil { + return 0, fmt.Errorf("dbfs: file not open for writing") + } + + err := h.api.AddBlock(h.ctx, AddBlock{ + Data: base64.StdEncoding.EncodeToString(p), + Handle: w.handle, + }) + if err != nil { + return 0, fmt.Errorf("dbfs: add block: %w", err) + } + return len(p), nil +} + +// Implement the [io.ReaderFrom] interface. +func (h *dbfsHandle) ReadFrom(r io.Reader) (int64, error) { + w := h.dbfsWriter + if w == nil { + return 0, fmt.Errorf("dbfs: file not open for writing") + } + + buf := make([]byte, maxDbfsBlockSize) + ntotal := int64(0) + for { + nread, err := r.Read(buf) + if err != nil { + // EOF on read means we're done. + // For writers being done means returning a nil error. + if err == io.EOF { + err = nil + } + return ntotal, err + } + + nwritten, err := h.Write(buf[:nread]) + ntotal += int64(nwritten) + if err != nil { + return ntotal, err + } + } +} + +// Implement the [io.Closer] interface. +func (h *dbfsHandle) Close() error { + w := h.dbfsWriter + if w == nil { + return fmt.Errorf("dbfs: file not open for writing") + } + + err := h.api.CloseByHandle(h.ctx, w.handle) + if err != nil { + return fmt.Errorf("dbfs: close: %w", err) + } + + return nil +} + +func (h *dbfsHandle) openForRead(mode DbfsFileMode) error { + res, err := h.api.GetStatusByPath(h.ctx, h.path) + if err != nil { + return err + } + h.dbfsReader = &dbfsReader{ + size: res.FileSize, + } + return nil +} + +func (h *dbfsHandle) openForWrite(mode DbfsFileMode) error { + res, err := h.api.Create(h.ctx, Create{ + Path: h.path, + Overwrite: (mode & DbfsOverwrite) != 0, + }) + if err != nil { + return err + } + h.dbfsWriter = &dbfsWriter{ + handle: res.Handle, + } + return nil +} + +// OpenFile opens a remote DBFS file for reading or writing. +// The returned object implements relevant [io] interfaces for convenient +// integration with other code that reads or writes bytes. +// +// The [io.WriterTo] interface is provided and maximizes throughput for +// bulk reads by reading data with the DBFS maximum read chunk size of 1MB. +// Similarly, the [io.ReaderFrom] interface is provided for bulk writing. +// +// A file opened for writing must always be closed. +func OpenFile(ctx context.Context, api *DbfsAPI, path string, mode DbfsFileMode) (*dbfsHandle, error) { + h := &dbfsHandle{ + ctx: useragent.InContext(ctx, "sdk-feature", "dbfs-handle"), + api: api, + path: path, + } + + isRead := (mode & DbfsRead) != 0 + isWrite := (mode & DbfsWrite) != 0 + if isRead && isWrite { + return nil, fmt.Errorf("dbfs: cannot open file for reading and writing at the same time") + } + + var err error + switch { + case isRead: + err = h.openForRead(mode) + case isWrite: + err = h.openForWrite(mode) + default: + // No mode specifed. The caller should be explicit so we return an error. + return nil, fmt.Errorf("dbfs: must specify DbfsRead or DbfsWrite") + } + + if err != nil { + return nil, fmt.Errorf("dbfs open: %w", err) + } + + return h, nil +} diff --git a/service/dbfs/utilities.go b/service/dbfs/utilities.go index 5b7a9452a..d2faafa3f 100644 --- a/service/dbfs/utilities.go +++ b/service/dbfs/utilities.go @@ -1,9 +1,7 @@ package dbfs import ( - "bytes" "context" - "encoding/base64" "fmt" "io" @@ -11,116 +9,30 @@ import ( "github.com/databricks/databricks-sdk-go/useragent" ) -var b64 = base64.StdEncoding - // Overwrite is like Put, but more friendly func (a *DbfsAPI) Overwrite(ctx context.Context, path string, r io.Reader) (err error) { ctx = useragent.InContext(ctx, "sdk-feature", "dbfs-overwrite") - handle, err := a.Create(ctx, Create{ - Path: path, - Overwrite: true, - }) + handle, err := OpenFile(ctx, a, path, DbfsWrite|DbfsOverwrite) if err != nil { - return fmt.Errorf("create: %w", err) - } - defer func() { - cerr := a.CloseByHandle(ctx, handle.Handle) - if cerr != nil { - err = fmt.Errorf("close: %w", cerr) - } - }() - buffer := make([]byte, 1e6) - for { - n, err := r.Read(buffer) - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("read: %w", err) - } - err = a.AddBlock(ctx, AddBlock{ - Data: b64.EncodeToString(buffer[0:n]), - Handle: handle.Handle, - }) - if err != nil { - return fmt.Errorf("add block: %w", err) - } - } - return err -} - -type FileReader struct { - Size int64 - ctx context.Context - api *DbfsAPI - path string - offset int64 -} - -func (r *FileReader) Read(p []byte) (n int, err error) { - if r.api == nil { - panic("invalid call") - } - if r.offset >= r.Size { - return 0, io.EOF + return err } - resp, err := r.api.Read(r.ctx, Read{ - Path: r.path, - Length: len(p), - Offset: int(r.offset), // TODO: make int32/in64 work properly - }) + _, err = handle.ReadFrom(r) + cerr := handle.Close() if err != nil { - return 0, fmt.Errorf("dbfs read: %w", err) + return err } - // The guard against offset >= size happens above, so this can only happen - // if the file is modified or truncated while reading. If this happens, - // the read contents will likely be corrupted, so we return an error. - if resp.BytesRead == 0 { - return 0, fmt.Errorf("dbfs read: unexpected EOF at offset %d (size %d)", r.offset, r.Size) + if cerr != nil { + return cerr } - r.offset += resp.BytesRead - return b64.Decode(p, []byte(resp.Data)) + return nil } -// Maximum read length for the DBFS read API (see [DbfsApi.Read]). -const maxDbfsReadSize = 1024 * 1024 - -// WriteTo makes [FileReader] implement the [io.WriterTo] interface. -// This can be used with [io.Copy] to maximize throughput, as -// it uses the maximum buffer size allowed by the DBFS API. -func (r *FileReader) WriteTo(w io.Writer) (n int64, err error) { - buf := make([]byte, maxDbfsReadSize) - nwritten := int64(0) - for { - n, err := r.Read(buf) - if err != nil { - // EOF on read means we're done. - // For writers being done means returning a nil error. - if err == io.EOF { - err = nil - } - return nwritten, err - } - n64, err := io.Copy(w, bytes.NewReader(buf[:n])) - nwritten += n64 - if err != nil { - return nwritten, err - } - } +func (a *DbfsAPI) Open(ctx context.Context, path string) (*dbfsHandle, error) { + return OpenFile(ctx, a, path, DbfsRead) } -func (a *DbfsAPI) Open(ctx context.Context, path string) (*FileReader, error) { - ctx = useragent.InContext(ctx, "sdk-feature", "dbfs-open") - info, err := a.GetStatusByPath(ctx, path) - if err != nil { - return nil, fmt.Errorf("get status: %w", err) - } - return &FileReader{ - Size: info.FileSize, - path: path, - ctx: ctx, - api: a, - }, nil +func (a *DbfsAPI) OpenFile(ctx context.Context, path string, mode DbfsFileMode) (*dbfsHandle, error) { + return OpenFile(ctx, a, path, mode) } // RecursiveList traverses the DBFS tree and returns all non-directory From c117ef1474f0ae6276bc414e84384bc10f7b70ef Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 20 Dec 2022 19:15:45 +0100 Subject: [PATCH 02/11] Address comments --- internal/dbfs_test.go | 84 ++++++++++--- service/dbfs/handle.go | 225 --------------------------------- service/dbfs/utilities.go | 257 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 309 insertions(+), 257 deletions(-) delete mode 100644 service/dbfs/handle.go diff --git a/internal/dbfs_test.go b/internal/dbfs_test.go index e965d2ed0..ba86d70cd 100644 --- a/internal/dbfs_test.go +++ b/internal/dbfs_test.go @@ -16,7 +16,15 @@ import ( "github.com/stretchr/testify/require" ) -func TestAccDbfsUtilities(t *testing.T) { +type hashable []byte + +func (buf hashable) Hash() uint32 { + h := fnv.New32a() + h.Write(buf) + return h.Sum32() +} + +func TestAccDbfsHandleWrite(t *testing.T) { ctx, w := workspaceTest(t) if w.Config.IsGcp() { t.Skip("dbfs not available on gcp") @@ -26,46 +34,82 @@ func TestAccDbfsUtilities(t *testing.T) { rand.Seed(time.Now().UnixNano()) in := make([]byte, 1.44*1e6) _, _ = rand.Read(in) - h := fnv.New32a() - h.Write(in) - inHash := h.Sum32() - - err := w.Dbfs.Overwrite(ctx, path, bytes.NewReader(in)) - require.NoError(t, err) defer w.Dbfs.Delete(ctx, dbfs.Delete{ Path: path, }) - // Download directly [io.Reader] and let [io.ReadAll] determine buffer size. + // Upload through [io.Writer]. + { + handle, err := w.Dbfs.Open(ctx, path, dbfs.FileModeWrite) + require.NoError(t, err) + n, err := handle.Write(in) + require.NoError(t, err) + assert.Equal(t, len(in), int(n)) + require.NoError(t, handle.Close()) + + // Verify contents hash. + out, err := w.Dbfs.ReadFile(ctx, path) + require.NoError(t, err) + assert.Equal(t, hashable(in).Hash(), hashable(out).Hash()) + } + + // Upload through [io.Writer] should fail because the file exists. + { + _, err := w.Dbfs.Open(ctx, path, dbfs.FileModeWrite) + require.ErrorContains(t, err, "dbfs: open: A file or directory already exists at the input path") + } + + // Upload through [io.ReadFrom] with overwrite bit set. + { + handle, err := w.Dbfs.Open(ctx, path, dbfs.FileModeWrite|dbfs.FileModeOverwrite) + require.NoError(t, err) + n, err := handle.ReadFrom(bytes.NewReader(in)) + require.NoError(t, err) + assert.Equal(t, len(in), int(n)) + require.NoError(t, handle.Close()) + + // Verify contents hash. + out, err := w.Dbfs.ReadFile(ctx, path) + require.NoError(t, err) + assert.Equal(t, hashable(in).Hash(), hashable(out).Hash()) + } + + // Upload through [dbfs.DbfsAPI.WriteFile]. + { + err := w.Dbfs.WriteFile(ctx, path, in) + require.NoError(t, err) + + // Verify contents hash. + out, err := w.Dbfs.ReadFile(ctx, path) + require.NoError(t, err) + assert.Equal(t, hashable(in).Hash(), hashable(out).Hash()) + } + + // Download through [io.Reader] and let [io.ReadAll] determine buffer size. { - dbfsReader, err := w.Dbfs.Open(ctx, path) + handle, err := w.Dbfs.Open(ctx, path, dbfs.FileModeRead) require.NoError(t, err) // Note: [io.ReadAll] always calls into the [io.Reader] interface. - out, err := io.ReadAll(dbfsReader) + out, err := io.ReadAll(handle) require.NoError(t, err) // Verify contents hash. - h := fnv.New32a() - h.Write(out) - require.Equal(t, inHash, h.Sum32()) + assert.Equal(t, hashable(in).Hash(), hashable(out).Hash()) } - // Download through [io.WriterTo] with maximum buffer size. + // Download through [io.WriterTo]. { - dbfsReader, err := w.Dbfs.Open(ctx, path) + handle, err := w.Dbfs.Open(ctx, path, dbfs.FileModeRead) require.NoError(t, err) - // Note: [io.Copy] leverages the [io.WriterTo] interface if available. var buf bytes.Buffer - _, err = io.Copy(&buf, dbfsReader) + _, err = handle.WriteTo(&buf) require.NoError(t, err) // Verify contents hash. - h := fnv.New32a() - h.Write(buf.Bytes()) - require.Equal(t, inHash, h.Sum32()) + assert.Equal(t, hashable(in).Hash(), hashable(buf.Bytes()).Hash()) } } diff --git a/service/dbfs/handle.go b/service/dbfs/handle.go deleted file mode 100644 index aa90c4cd8..000000000 --- a/service/dbfs/handle.go +++ /dev/null @@ -1,225 +0,0 @@ -package dbfs - -import ( - "bytes" - "context" - "encoding/base64" - "fmt" - "io" - - "github.com/databricks/databricks-sdk-go/useragent" -) - -// DbfsFileMode conveys user intent when opening a file. -type DbfsFileMode int - -const ( - // Exactly one of DbfsRead or DbfsWrite must be specified. - DbfsRead DbfsFileMode = 1 << iota - DbfsWrite - DbfsOverwrite -) - -// Maximum read or write length for the DBFS API. -const maxDbfsBlockSize = 1024 * 1024 - -type dbfsReader struct { - size int64 - offset int64 -} - -type dbfsWriter struct { - handle int64 -} - -type dbfsHandle struct { - ctx context.Context - api *DbfsAPI - path string - - *dbfsReader - *dbfsWriter -} - -// Implement the [io.Reader] interface. -func (h *dbfsHandle) Read(p []byte) (int, error) { - r := h.dbfsReader - if r == nil { - return 0, fmt.Errorf("dbfs file not open for reading") - } - - if r.offset >= r.size { - return 0, io.EOF - } - - res, err := h.api.Read(h.ctx, Read{ - Path: h.path, - Length: len(p), - Offset: int(r.offset), // TODO: make int32/in64 work properly - }) - if err != nil { - return 0, fmt.Errorf("dbfs read: %w", err) - } - - // The guard against offset >= size happens above, so this can only happen - // if the file is modified or truncated while reading. If this happens, - // the read contents will likely be corrupted, so we return an error. - if res.BytesRead == 0 { - return 0, fmt.Errorf("dbfs read: unexpected EOF at offset %d (size %d)", r.offset, r.size) - } - - r.offset += res.BytesRead - return base64.StdEncoding.Decode(p, []byte(res.Data)) -} - -// Implement the [io.WriterTo] interface. -func (h *dbfsHandle) WriteTo(w io.Writer) (int64, error) { - r := h.dbfsReader - if r == nil { - return 0, fmt.Errorf("dbfs file not open for reading") - } - - buf := make([]byte, maxDbfsBlockSize) - ntotal := int64(0) - for { - nread, err := h.Read(buf) - if err != nil { - // EOF on read means we're done. - // For writers being done means returning a nil error. - if err == io.EOF { - err = nil - } - return ntotal, err - } - nwritten, err := io.Copy(w, bytes.NewReader(buf[:nread])) - ntotal += nwritten - if err != nil { - return ntotal, err - } - } -} - -// Implement the [io.Writer] interface. -func (h *dbfsHandle) Write(p []byte) (int, error) { - w := h.dbfsWriter - if w == nil { - return 0, fmt.Errorf("dbfs: file not open for writing") - } - - err := h.api.AddBlock(h.ctx, AddBlock{ - Data: base64.StdEncoding.EncodeToString(p), - Handle: w.handle, - }) - if err != nil { - return 0, fmt.Errorf("dbfs: add block: %w", err) - } - return len(p), nil -} - -// Implement the [io.ReaderFrom] interface. -func (h *dbfsHandle) ReadFrom(r io.Reader) (int64, error) { - w := h.dbfsWriter - if w == nil { - return 0, fmt.Errorf("dbfs: file not open for writing") - } - - buf := make([]byte, maxDbfsBlockSize) - ntotal := int64(0) - for { - nread, err := r.Read(buf) - if err != nil { - // EOF on read means we're done. - // For writers being done means returning a nil error. - if err == io.EOF { - err = nil - } - return ntotal, err - } - - nwritten, err := h.Write(buf[:nread]) - ntotal += int64(nwritten) - if err != nil { - return ntotal, err - } - } -} - -// Implement the [io.Closer] interface. -func (h *dbfsHandle) Close() error { - w := h.dbfsWriter - if w == nil { - return fmt.Errorf("dbfs: file not open for writing") - } - - err := h.api.CloseByHandle(h.ctx, w.handle) - if err != nil { - return fmt.Errorf("dbfs: close: %w", err) - } - - return nil -} - -func (h *dbfsHandle) openForRead(mode DbfsFileMode) error { - res, err := h.api.GetStatusByPath(h.ctx, h.path) - if err != nil { - return err - } - h.dbfsReader = &dbfsReader{ - size: res.FileSize, - } - return nil -} - -func (h *dbfsHandle) openForWrite(mode DbfsFileMode) error { - res, err := h.api.Create(h.ctx, Create{ - Path: h.path, - Overwrite: (mode & DbfsOverwrite) != 0, - }) - if err != nil { - return err - } - h.dbfsWriter = &dbfsWriter{ - handle: res.Handle, - } - return nil -} - -// OpenFile opens a remote DBFS file for reading or writing. -// The returned object implements relevant [io] interfaces for convenient -// integration with other code that reads or writes bytes. -// -// The [io.WriterTo] interface is provided and maximizes throughput for -// bulk reads by reading data with the DBFS maximum read chunk size of 1MB. -// Similarly, the [io.ReaderFrom] interface is provided for bulk writing. -// -// A file opened for writing must always be closed. -func OpenFile(ctx context.Context, api *DbfsAPI, path string, mode DbfsFileMode) (*dbfsHandle, error) { - h := &dbfsHandle{ - ctx: useragent.InContext(ctx, "sdk-feature", "dbfs-handle"), - api: api, - path: path, - } - - isRead := (mode & DbfsRead) != 0 - isWrite := (mode & DbfsWrite) != 0 - if isRead && isWrite { - return nil, fmt.Errorf("dbfs: cannot open file for reading and writing at the same time") - } - - var err error - switch { - case isRead: - err = h.openForRead(mode) - case isWrite: - err = h.openForWrite(mode) - default: - // No mode specifed. The caller should be explicit so we return an error. - return nil, fmt.Errorf("dbfs: must specify DbfsRead or DbfsWrite") - } - - if err != nil { - return nil, fmt.Errorf("dbfs open: %w", err) - } - - return h, nil -} diff --git a/service/dbfs/utilities.go b/service/dbfs/utilities.go index d2faafa3f..c264d8080 100644 --- a/service/dbfs/utilities.go +++ b/service/dbfs/utilities.go @@ -1,7 +1,9 @@ package dbfs import ( + "bufio" "context" + "encoding/base64" "fmt" "io" @@ -9,30 +11,261 @@ import ( "github.com/databricks/databricks-sdk-go/useragent" ) -// Overwrite is like Put, but more friendly -func (a *DbfsAPI) Overwrite(ctx context.Context, path string, r io.Reader) (err error) { - ctx = useragent.InContext(ctx, "sdk-feature", "dbfs-overwrite") - handle, err := OpenFile(ctx, a, path, DbfsWrite|DbfsOverwrite) +// FileMode conveys user intent when opening a file. +type FileMode int + +const ( + // Exactly one of FileModeRead or FileModeWrite must be specified. + FileModeRead FileMode = 1 << iota + FileModeWrite + FileModeOverwrite +) + +// Maximum read or write length for the DBFS API. +const maxDbfsBlockSize = 1024 * 1024 + +// Internal only state for a read handle. +type dbfsHandleReader struct { + size int64 + offset int64 +} + +// Internal only state for a write handle. +type dbfsHandleWriter struct { + handle int64 +} + +// Internal only state for a DBFS file handle. +type dbfsHandle struct { + ctx context.Context + api DbfsAPI + path string + + reader *dbfsHandleReader + writer *dbfsHandleWriter +} + +// Handle defines the interface of the object returned by [DbfsAPI.Open]. +type Handle interface { + io.ReadWriteCloser + io.WriterTo + io.ReaderFrom +} + +// Implement the [io.Reader] interface. +func (h *dbfsHandle) Read(p []byte) (int, error) { + r := h.reader + if r == nil { + return 0, fmt.Errorf("dbfs: file not open for reading") + } + + var ntotal int + for ntotal < len(p) { + if r.offset >= r.size { + return ntotal, io.EOF + } + + chunk := p[ntotal:] + if len(chunk) > maxDbfsBlockSize { + chunk = chunk[:maxDbfsBlockSize] + } + + res, err := h.api.Read(h.ctx, Read{ + Path: h.path, + Length: len(chunk), + Offset: int(r.offset), // TODO: make int32/in64 work properly + }) + if err != nil { + return ntotal, fmt.Errorf("dbfs: read: %w", err) + } + + // The guard against offset >= size happens above, so this can only happen + // if the file is modified or truncated while reading. If this happens, + // the read contents will likely be corrupted, so we return an error. + if res.BytesRead == 0 { + return ntotal, fmt.Errorf("dbfs: read: unexpected EOF at offset %d (size %d)", r.offset, r.size) + } + + nread, err := base64.StdEncoding.Decode(chunk, []byte(res.Data)) + if err != nil { + return ntotal, err + } + + ntotal += nread + r.offset += int64(nread) + } + + return ntotal, nil +} + +// Implement the [io.Writer] interface. +func (h *dbfsHandle) Write(p []byte) (int, error) { + w := h.writer + if w == nil { + return 0, fmt.Errorf("dbfs: file not open for writing") + } + + var ntotal int + for ntotal < len(p) { + chunk := p[ntotal:] + if len(chunk) > maxDbfsBlockSize { + chunk = chunk[:maxDbfsBlockSize] + } + + err := h.api.AddBlock(h.ctx, AddBlock{ + Data: base64.StdEncoding.EncodeToString(chunk), + Handle: w.handle, + }) + if err != nil { + return ntotal, fmt.Errorf("dbfs: add block: %w", err) + } + + ntotal += len(chunk) + } + + return ntotal, nil +} + +// Implement the [io.Closer] interface. +func (h *dbfsHandle) Close() error { + w := h.writer + if w == nil { + return fmt.Errorf("dbfs: file not open for writing") + } + + err := h.api.CloseByHandle(h.ctx, w.handle) + if err != nil { + return fmt.Errorf("dbfs: close: %w", err) + } + + return nil +} + +// Implement the [io.WriterTo] interface. +func (h *dbfsHandle) WriteTo(w io.Writer) (int64, error) { + if h.reader == nil { + return 0, fmt.Errorf("dbfs: file not open for reading") + } + + // Limit types to io.Reader and io.Writer to avoid recursion + // into WriteTo or ReadFrom functions on underlying types. + ior := struct{ io.Reader }{h} + iow := struct{ io.Writer }{w} + return bufio.NewReaderSize(ior, maxDbfsBlockSize).WriteTo(iow) +} + +// Implement the [io.ReaderFrom] interface. +func (h *dbfsHandle) ReadFrom(r io.Reader) (int64, error) { + if h.writer == nil { + return 0, fmt.Errorf("dbfs: file not open for writing") + } + + // Limit types to io.Reader and io.Writer to avoid recursion + // into WriteTo or ReadFrom functions on underlying types. + ior := struct{ io.Reader }{r} + iow := struct{ io.Writer }{h} + bw := bufio.NewWriterSize(iow, maxDbfsBlockSize) + n, err := bw.ReadFrom(ior) + if err != nil { + return n, err + } + return n, bw.Flush() +} + +func (h *dbfsHandle) openForRead(mode FileMode) error { + res, err := h.api.GetStatusByPath(h.ctx, h.path) if err != nil { return err } - _, err = handle.ReadFrom(r) - cerr := handle.Close() + h.reader = &dbfsHandleReader{ + size: res.FileSize, + } + return nil +} + +func (h *dbfsHandle) openForWrite(mode FileMode) error { + res, err := h.api.Create(h.ctx, Create{ + Path: h.path, + Overwrite: (mode & FileModeOverwrite) != 0, + }) if err != nil { return err } - if cerr != nil { - return cerr + h.writer = &dbfsHandleWriter{ + handle: res.Handle, } return nil } -func (a *DbfsAPI) Open(ctx context.Context, path string) (*dbfsHandle, error) { - return OpenFile(ctx, a, path, DbfsRead) +// Open opens a remote DBFS file for reading or writing. +// The returned object implements relevant [io] interfaces for convenient +// integration with other code that reads or writes bytes. +// +// The [io.WriterTo] interface is provided and maximizes throughput for +// bulk reads by reading data with the DBFS maximum read chunk size of 1MB. +// Similarly, the [io.ReaderFrom] interface is provided for bulk writing. +// +// A file opened for writing must always be closed. +func (a DbfsAPI) Open(ctx context.Context, path string, mode FileMode) (Handle, error) { + h := &dbfsHandle{ + ctx: useragent.InContext(ctx, "sdk-feature", "dbfs-handle"), + api: a, + path: path, + } + + isRead := (mode & FileModeRead) != 0 + isWrite := (mode & FileModeWrite) != 0 + if isRead && isWrite { + return nil, fmt.Errorf("dbfs: cannot open file for reading and writing at the same time") + } + + var err error + switch { + case isRead: + err = h.openForRead(mode) + case isWrite: + err = h.openForWrite(mode) + default: + // No mode specifed. The caller should be explicit so we return an error. + return nil, fmt.Errorf("dbfs: must specify dbfs.FileModeRead or dbfs.FileModeWrite") + } + + if err != nil { + return nil, fmt.Errorf("dbfs: open: %w", err) + } + + return h, nil } -func (a *DbfsAPI) OpenFile(ctx context.Context, path string, mode DbfsFileMode) (*dbfsHandle, error) { - return OpenFile(ctx, a, path, mode) +// ReadFile is identical to [os.ReadFile] but for DBFS. +func (a DbfsAPI) ReadFile(ctx context.Context, name string) ([]byte, error) { + h, err := a.Open(ctx, name, FileModeRead) + if err != nil { + return nil, err + } + + h_ := h.(*dbfsHandle) + buf := make([]byte, h_.reader.size) + _, err = h.Read(buf) + if err != nil && err != io.EOF { + return nil, err + } + return buf, nil +} + +// WriteFile is identical to [os.WriteFile] but for DBFS. +func (a DbfsAPI) WriteFile(ctx context.Context, name string, data []byte) error { + h, err := a.Open(ctx, name, FileModeWrite|FileModeOverwrite) + if err != nil { + return err + } + + _, err = h.Write(data) + cerr := h.Close() + if err == nil && cerr != nil { + err = cerr + } + return err } // RecursiveList traverses the DBFS tree and returns all non-directory From dc6cff91f071caeb0a867dab0fd2f7a07e870a5d Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 21 Dec 2022 14:23:30 +0100 Subject: [PATCH 03/11] First round of comments --- service/dbfs/utilities.go | 133 +++++++++++++++++++++++--------------- 1 file changed, 81 insertions(+), 52 deletions(-) diff --git a/service/dbfs/utilities.go b/service/dbfs/utilities.go index c264d8080..8bec112d2 100644 --- a/service/dbfs/utilities.go +++ b/service/dbfs/utilities.go @@ -25,24 +25,60 @@ const ( const maxDbfsBlockSize = 1024 * 1024 // Internal only state for a read handle. -type dbfsHandleReader struct { +type fileHandleReader struct { size int64 offset int64 } +func (f *fileHandleReader) errorf(format string, a ...any) error { + return fmt.Errorf("dbfs read: "+format, a...) +} + +func (f *fileHandleReader) error(err error) error { + if err == nil { + return nil + } + return f.errorf("%w", err) +} + // Internal only state for a write handle. -type dbfsHandleWriter struct { +type fileHandleWriter struct { handle int64 } +func (f *fileHandleWriter) errorf(format string, a ...any) error { + return fmt.Errorf("dbfs write: "+format, a...) +} + +func (f *fileHandleWriter) error(err error) error { + if err == nil { + return nil + } + return f.errorf("%w", err) +} + // Internal only state for a DBFS file handle. -type dbfsHandle struct { +type fileHandle struct { ctx context.Context - api DbfsAPI + api *DbfsAPI path string - reader *dbfsHandleReader - writer *dbfsHandleWriter + reader *fileHandleReader + writer *fileHandleWriter +} + +func (h *fileHandle) checkRead() (*fileHandleReader, error) { + if h.reader != nil { + return h.reader, nil + } + return nil, fmt.Errorf("dbfs: file not open for reading") +} + +func (h *fileHandle) checkWrite() (*fileHandleWriter, error) { + if h.writer != nil { + return h.writer, nil + } + return nil, fmt.Errorf("dbfs: file not open for writing") } // Handle defines the interface of the object returned by [DbfsAPI.Open]. @@ -53,10 +89,10 @@ type Handle interface { } // Implement the [io.Reader] interface. -func (h *dbfsHandle) Read(p []byte) (int, error) { - r := h.reader - if r == nil { - return 0, fmt.Errorf("dbfs: file not open for reading") +func (h *fileHandle) Read(p []byte) (int, error) { + r, err := h.checkRead() + if err != nil { + return 0, err } var ntotal int @@ -76,19 +112,19 @@ func (h *dbfsHandle) Read(p []byte) (int, error) { Offset: int(r.offset), // TODO: make int32/in64 work properly }) if err != nil { - return ntotal, fmt.Errorf("dbfs: read: %w", err) + return ntotal, r.error(err) } // The guard against offset >= size happens above, so this can only happen // if the file is modified or truncated while reading. If this happens, // the read contents will likely be corrupted, so we return an error. if res.BytesRead == 0 { - return ntotal, fmt.Errorf("dbfs: read: unexpected EOF at offset %d (size %d)", r.offset, r.size) + return ntotal, r.errorf("unexpected EOF at offset %d (size %d)", r.offset, r.size) } nread, err := base64.StdEncoding.Decode(chunk, []byte(res.Data)) if err != nil { - return ntotal, err + return ntotal, r.error(err) } ntotal += nread @@ -99,10 +135,10 @@ func (h *dbfsHandle) Read(p []byte) (int, error) { } // Implement the [io.Writer] interface. -func (h *dbfsHandle) Write(p []byte) (int, error) { - w := h.writer - if w == nil { - return 0, fmt.Errorf("dbfs: file not open for writing") +func (h *fileHandle) Write(p []byte) (int, error) { + w, err := h.checkWrite() + if err != nil { + return 0, err } var ntotal int @@ -117,7 +153,7 @@ func (h *dbfsHandle) Write(p []byte) (int, error) { Handle: w.handle, }) if err != nil { - return ntotal, fmt.Errorf("dbfs: add block: %w", err) + return ntotal, w.error(err) } ntotal += len(chunk) @@ -127,24 +163,20 @@ func (h *dbfsHandle) Write(p []byte) (int, error) { } // Implement the [io.Closer] interface. -func (h *dbfsHandle) Close() error { - w := h.writer - if w == nil { - return fmt.Errorf("dbfs: file not open for writing") - } - - err := h.api.CloseByHandle(h.ctx, w.handle) +func (h *fileHandle) Close() error { + w, err := h.checkWrite() if err != nil { - return fmt.Errorf("dbfs: close: %w", err) + return err } - return nil + return w.error(h.api.CloseByHandle(h.ctx, w.handle)) } // Implement the [io.WriterTo] interface. -func (h *dbfsHandle) WriteTo(w io.Writer) (int64, error) { - if h.reader == nil { - return 0, fmt.Errorf("dbfs: file not open for reading") +func (h *fileHandle) WriteTo(w io.Writer) (int64, error) { + _, err := h.checkRead() + if err != nil { + return 0, err } // Limit types to io.Reader and io.Writer to avoid recursion @@ -155,9 +187,10 @@ func (h *dbfsHandle) WriteTo(w io.Writer) (int64, error) { } // Implement the [io.ReaderFrom] interface. -func (h *dbfsHandle) ReadFrom(r io.Reader) (int64, error) { - if h.writer == nil { - return 0, fmt.Errorf("dbfs: file not open for writing") +func (h *fileHandle) ReadFrom(r io.Reader) (int64, error) { + _, err := h.checkWrite() + if err != nil { + return 0, err } // Limit types to io.Reader and io.Writer to avoid recursion @@ -172,18 +205,18 @@ func (h *dbfsHandle) ReadFrom(r io.Reader) (int64, error) { return n, bw.Flush() } -func (h *dbfsHandle) openForRead(mode FileMode) error { +func (h *fileHandle) openForRead(mode FileMode) error { res, err := h.api.GetStatusByPath(h.ctx, h.path) if err != nil { return err } - h.reader = &dbfsHandleReader{ + h.reader = &fileHandleReader{ size: res.FileSize, } return nil } -func (h *dbfsHandle) openForWrite(mode FileMode) error { +func (h *fileHandle) openForWrite(mode FileMode) error { res, err := h.api.Create(h.ctx, Create{ Path: h.path, Overwrite: (mode & FileModeOverwrite) != 0, @@ -191,7 +224,7 @@ func (h *dbfsHandle) openForWrite(mode FileMode) error { if err != nil { return err } - h.writer = &dbfsHandleWriter{ + h.writer = &fileHandleWriter{ handle: res.Handle, } return nil @@ -206,8 +239,8 @@ func (h *dbfsHandle) openForWrite(mode FileMode) error { // Similarly, the [io.ReaderFrom] interface is provided for bulk writing. // // A file opened for writing must always be closed. -func (a DbfsAPI) Open(ctx context.Context, path string, mode FileMode) (Handle, error) { - h := &dbfsHandle{ +func (a *DbfsAPI) Open(ctx context.Context, path string, mode FileMode) (Handle, error) { + h := &fileHandle{ ctx: useragent.InContext(ctx, "sdk-feature", "dbfs-handle"), api: a, path: path, @@ -215,36 +248,32 @@ func (a DbfsAPI) Open(ctx context.Context, path string, mode FileMode) (Handle, isRead := (mode & FileModeRead) != 0 isWrite := (mode & FileModeWrite) != 0 - if isRead && isWrite { - return nil, fmt.Errorf("dbfs: cannot open file for reading and writing at the same time") + if (isRead && isWrite) || (!isRead && !isWrite) { + return nil, fmt.Errorf("dbfs: must specify dbfs.FileModeRead or dbfs.FileModeWrite") } var err error - switch { - case isRead: + if isRead { err = h.openForRead(mode) - case isWrite: + } + if isWrite { err = h.openForWrite(mode) - default: - // No mode specifed. The caller should be explicit so we return an error. - return nil, fmt.Errorf("dbfs: must specify dbfs.FileModeRead or dbfs.FileModeWrite") } - if err != nil { - return nil, fmt.Errorf("dbfs: open: %w", err) + return nil, fmt.Errorf("dbfs: %w", err) } return h, nil } // ReadFile is identical to [os.ReadFile] but for DBFS. -func (a DbfsAPI) ReadFile(ctx context.Context, name string) ([]byte, error) { +func (a *DbfsAPI) ReadFile(ctx context.Context, name string) ([]byte, error) { h, err := a.Open(ctx, name, FileModeRead) if err != nil { return nil, err } - h_ := h.(*dbfsHandle) + h_ := h.(*fileHandle) buf := make([]byte, h_.reader.size) _, err = h.Read(buf) if err != nil && err != io.EOF { @@ -254,7 +283,7 @@ func (a DbfsAPI) ReadFile(ctx context.Context, name string) ([]byte, error) { } // WriteFile is identical to [os.WriteFile] but for DBFS. -func (a DbfsAPI) WriteFile(ctx context.Context, name string, data []byte) error { +func (a *DbfsAPI) WriteFile(ctx context.Context, name string, data []byte) error { h, err := a.Open(ctx, name, FileModeWrite|FileModeOverwrite) if err != nil { return err From 69c3f10bace861bdbe008ddf194dace65d267b8f Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 21 Dec 2022 14:35:35 +0100 Subject: [PATCH 04/11] Prefix with "dbfs open" --- service/dbfs/utilities.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/dbfs/utilities.go b/service/dbfs/utilities.go index 8bec112d2..566f5240b 100644 --- a/service/dbfs/utilities.go +++ b/service/dbfs/utilities.go @@ -249,7 +249,7 @@ func (a *DbfsAPI) Open(ctx context.Context, path string, mode FileMode) (Handle, isRead := (mode & FileModeRead) != 0 isWrite := (mode & FileModeWrite) != 0 if (isRead && isWrite) || (!isRead && !isWrite) { - return nil, fmt.Errorf("dbfs: must specify dbfs.FileModeRead or dbfs.FileModeWrite") + return nil, fmt.Errorf("dbfs open: must specify dbfs.FileModeRead or dbfs.FileModeWrite") } var err error @@ -260,7 +260,7 @@ func (a *DbfsAPI) Open(ctx context.Context, path string, mode FileMode) (Handle, err = h.openForWrite(mode) } if err != nil { - return nil, fmt.Errorf("dbfs: %w", err) + return nil, fmt.Errorf("dbfs open: %w", err) } return h, nil From 918566d6a48b657687c4b772426c889a379a4a80 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 21 Dec 2022 16:14:09 +0100 Subject: [PATCH 05/11] Docs --- README.md | 35 ++++++++++++++++++++++++++++++---- service/dbfs/doc.go | 46 ++++++++++++++++++++++++++++++--------------- 2 files changed, 62 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 77badc778..f63237989 100644 --- a/README.md +++ b/README.md @@ -477,19 +477,46 @@ runningCluster, err := w.Clusters.CreateAndWait(ctx, clusters.CreateCluster{ }) ``` -## `io.Reader` integration for DBFS +## Integration with `io` interfaces for DBFS -Use the higher-level `w.Dbfs.Open` and `w.Dbfs.Overwrite` methods to work with remote files through the `io.Reader` interface. Internally, these methods wrap the low-level intricacies of working with Databricks REST APIs, providing a convenient interface to you as a developer. +You can open a file on DBFS for reading or writing with `w.Dbfs.Open`. +This function returns a `dbfs.Handle` that is compatible with a subset of `io` +interfaces for reading, writing, and closing. + +Uploading a file from an [io.Reader]: ```go upload, _ := os.Open("/path/to/local/file.ext") -_ = w.Dbfs.Overwrite(ctx, "/path/to/remote/file", upload) +remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite) +io.Copy(remote, upload) +remote.Close() +``` +Downloading a file to an [io.Writer]: + +```go download, _ := os.Create("/path/to/local") -remote, _ := w.Dbfs.Open(ctx, "/path/to/remote") +remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeRead) _ = io.Copy(download, remote) ``` +### Reading into and writing from buffers + +You can read from or write to a DBFS file directly from a byte slice through +the convenience functions `w.Dbfs.ReadFile` and `w.Dbfs.WriteFile`. + +Uploading a file from a byte slice: + +```go +err := w.Dbfs.WriteFile(ctx, "/path/to/remote/file", []byte("Hello world!")) +``` + +Downloading a file into a byte slice: + +```go +buf, err := w.Dbfs.ReadFile(ctx, "/path/to/remote/file") +``` + ## `pflag.Value` for enums Databricks SDK for Go loosely integrates with [spf13/pflag](https://github.com/spf13/pflag) by implementing [pflag.Value](https://pkg.go.dev/github.com/spf13/pflag#Value) for all enum types. diff --git a/service/dbfs/doc.go b/service/dbfs/doc.go index a8dc9a3f8..27f3c8426 100644 --- a/service/dbfs/doc.go +++ b/service/dbfs/doc.go @@ -1,33 +1,49 @@ -// Databricks FileSystem (DBFS) API +// Databricks File System (DBFS) API // -// We strongly recommend using clients created via -// [github.com/databricks/databricks-sdk-go/workspaces.New] to simplify -// configuration experience. +// We recommend using a client created via [databricks.NewWorkspaceClient] +// to simplify the configuration experience. // -// Please use the high-level [DbfsAPI.Open] and [DbfsAPI.Overwrite] methods -// to work with remote files through Go's [io] interfaces. The return value -// of [DbfsAPI.Open] implements the [io.Reader] and [io.WriterTo] interfaces. -// The [io.WriterTo] interface is used by [io.Copy] and maximizes throughput by -// reading data with the DBFS maximum read chunk size of 1MB. +// # Reading and writing files // -// Internally, these methods wrap the low level [DbfsAPI.Create], -// [DbfsAPI.Close], [DbfsAPI.Read], and [DbfsAPI.AddBlock] methods: +// You can open a file on DBFS for reading or writing with [DbfsAPI.Open]. +// This function returns a [Handle] that is compatible with a subset of [io] +// interfaces for reading, writing, and closing. +// +// Uploading a file from an [io.Reader]: // // upload, _ := os.Open("/path/to/local/file.ext") -// _ = w.Dbfs.Overwrite(ctx, "/path/to/remote/file", upload) +// remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite) +// io.Copy(remote, upload) +// remote.Close() +// +// Downloading a file to an [io.Writer]: // // download, _ := os.Create("/path/to/local") -// remote, _ := w.Dbfs.Open(ctx, "/path/to/remote") +// remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeRead) // _ = io.Copy(download, remote) // -// Moving files: +// # Reading and writing files from buffers +// +// You can read from or write to a DBFS file directly from a byte slice through +// the convenience functions [DbfsAPI.ReadFile] and [DbfsAPI.WriteFile]. +// +// Uploading a file from a byte slice: +// +// buf := []byte("Hello world!") +// _ = w.Dbfs.WriteFile(ctx, "/path/to/remote/file", buf) +// +// Downloading a file into a byte slice: +// +// buf, err := w.Dbfs.ReadFile(ctx, "/path/to/remote/file") +// +// # Moving files // // err := w.Dbfs.Move(ctx, dbfs.Move{ // SourcePath: "/remote/src/path", // DestinationPath: "/remote/dst/path", // }) // -// Creating directories: +// # Creating directories // // w.Dbfs.MkdirsByPath(ctx, "/remote/dir/path") package dbfs From 8a4a8f780b4034640dfc312df12d6028ddfaba4a Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 21 Dec 2022 16:25:16 +0100 Subject: [PATCH 06/11] Fix test to match new error message --- internal/dbfs_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/dbfs_test.go b/internal/dbfs_test.go index ba86d70cd..40e56791f 100644 --- a/internal/dbfs_test.go +++ b/internal/dbfs_test.go @@ -57,7 +57,7 @@ func TestAccDbfsHandleWrite(t *testing.T) { // Upload through [io.Writer] should fail because the file exists. { _, err := w.Dbfs.Open(ctx, path, dbfs.FileModeWrite) - require.ErrorContains(t, err, "dbfs: open: A file or directory already exists at the input path") + require.ErrorContains(t, err, "dbfs open: A file or directory already exists at the input path") } // Upload through [io.ReadFrom] with overwrite bit set. From 27fda6b01aee15d485faaa1e7724152dea539a4f Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 22 Dec 2022 08:49:56 +0100 Subject: [PATCH 07/11] Update README.md Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f63237989..1f69447a7 100644 --- a/README.md +++ b/README.md @@ -488,7 +488,7 @@ Uploading a file from an [io.Reader]: ```go upload, _ := os.Open("/path/to/local/file.ext") remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite) -io.Copy(remote, upload) +_, _ = io.Copy(remote, upload) remote.Close() ``` From f540b15361a2d592da76eab6123936ec4e2fe028 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 22 Dec 2022 08:50:04 +0100 Subject: [PATCH 08/11] Update README.md Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1f69447a7..134f1339b 100644 --- a/README.md +++ b/README.md @@ -489,7 +489,7 @@ Uploading a file from an [io.Reader]: upload, _ := os.Open("/path/to/local/file.ext") remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite) _, _ = io.Copy(remote, upload) -remote.Close() +_ = remote.Close() ``` Downloading a file to an [io.Writer]: From 53ee739947d1b762532889b55ba64db3d492ec96 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 22 Dec 2022 08:50:31 +0100 Subject: [PATCH 09/11] Update README.md Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 134f1339b..849b8d92a 100644 --- a/README.md +++ b/README.md @@ -487,7 +487,7 @@ Uploading a file from an [io.Reader]: ```go upload, _ := os.Open("/path/to/local/file.ext") -remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite) +remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite|dbfs.FileModeOverwrite) _, _ = io.Copy(remote, upload) _ = remote.Close() ``` From 175a75811951834b0d487775cd0ad84a03468e8a Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 22 Dec 2022 08:53:21 +0100 Subject: [PATCH 10/11] Doc fixes --- README.md | 8 ++++---- service/dbfs/doc.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 849b8d92a..5df02f282 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ The Databricks SDK for Go includes functionality to accelerate development with - [Paginated responses](#paginated-responses) - [GetByName utility methods](#getbyname-utility-methods) - [Node type and Databricks Runtime selectors](#node-type-and-databricks-runtime-selectors) -- [io.Reader integration for DBFS](#ioreader-integration-for-dbfs) +- [Integration with `io` interfaces for DBFS](#integration-with-io-interfaces-for-dbfs) - [Logging](#logging) - [Interface stability](#interface-stability) @@ -483,7 +483,7 @@ You can open a file on DBFS for reading or writing with `w.Dbfs.Open`. This function returns a `dbfs.Handle` that is compatible with a subset of `io` interfaces for reading, writing, and closing. -Uploading a file from an [io.Reader]: +Uploading a file from an `io.Reader`: ```go upload, _ := os.Open("/path/to/local/file.ext") @@ -492,12 +492,12 @@ _, _ = io.Copy(remote, upload) _ = remote.Close() ``` -Downloading a file to an [io.Writer]: +Downloading a file to an `io.Writer`: ```go download, _ := os.Create("/path/to/local") remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeRead) -_ = io.Copy(download, remote) +_, _ = io.Copy(download, remote) ``` ### Reading into and writing from buffers diff --git a/service/dbfs/doc.go b/service/dbfs/doc.go index 27f3c8426..0779ace43 100644 --- a/service/dbfs/doc.go +++ b/service/dbfs/doc.go @@ -12,7 +12,7 @@ // Uploading a file from an [io.Reader]: // // upload, _ := os.Open("/path/to/local/file.ext") -// remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite) +// remote, _ := w.Dbfs.Open(ctx, "/path/to/remote/file", dbfs.FileModeWrite|dbfs.FileModeOverwrite) // io.Copy(remote, upload) // remote.Close() // From 816e23c4b2a09c124776e0fdfca4046644d681da Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 22 Dec 2022 09:07:30 +0100 Subject: [PATCH 11/11] Split out ReadFile and WriteFile to separate test --- internal/dbfs_test.go | 49 ++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/internal/dbfs_test.go b/internal/dbfs_test.go index 40e56791f..7edb69e5a 100644 --- a/internal/dbfs_test.go +++ b/internal/dbfs_test.go @@ -24,7 +24,7 @@ func (buf hashable) Hash() uint32 { return h.Sum32() } -func TestAccDbfsHandleWrite(t *testing.T) { +func TestAccDbfsOpen(t *testing.T) { ctx, w := workspaceTest(t) if w.Config.IsGcp() { t.Skip("dbfs not available on gcp") @@ -75,17 +75,6 @@ func TestAccDbfsHandleWrite(t *testing.T) { assert.Equal(t, hashable(in).Hash(), hashable(out).Hash()) } - // Upload through [dbfs.DbfsAPI.WriteFile]. - { - err := w.Dbfs.WriteFile(ctx, path, in) - require.NoError(t, err) - - // Verify contents hash. - out, err := w.Dbfs.ReadFile(ctx, path) - require.NoError(t, err) - assert.Equal(t, hashable(in).Hash(), hashable(out).Hash()) - } - // Download through [io.Reader] and let [io.ReadAll] determine buffer size. { handle, err := w.Dbfs.Open(ctx, path, dbfs.FileModeRead) @@ -113,6 +102,42 @@ func TestAccDbfsHandleWrite(t *testing.T) { } } +func TestAccDbfsReadFileWriteFile(t *testing.T) { + ctx, w := workspaceTest(t) + if w.Config.IsGcp() { + t.Skip("dbfs not available on gcp") + } + + path := RandomName("/tmp/.sdk/fake") + rand.Seed(time.Now().UnixNano()) + in := make([]byte, 1.44*1e6) + _, _ = rand.Read(in) + + defer w.Dbfs.Delete(ctx, dbfs.Delete{ + Path: path, + }) + + // Write file to DBFS. + err := w.Dbfs.WriteFile(ctx, path, in) + require.NoError(t, err) + + // Verify contents hash. + out, err := w.Dbfs.ReadFile(ctx, path) + require.NoError(t, err) + assert.Equal(t, hashable(in).Hash(), hashable(out).Hash()) + + hello := []byte("Hello world!") + + // Writing to the same path should truncate the existing file. + err = w.Dbfs.WriteFile(ctx, path, hello) + require.NoError(t, err) + + // Verify contents hash. + out, err = w.Dbfs.ReadFile(ctx, path) + require.NoError(t, err) + assert.Equal(t, hashable(hello).Hash(), hashable(out).Hash()) +} + func TestAccListDbfsIntegration(t *testing.T) { ctx, w := workspaceTest(t) if w.Config.IsGcp() {