Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 150 additions & 29 deletions crates/cli/src/commands/mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Moves objects between locations (copy + delete).

use clap::Args;
use rc_core::{AliasManager, ObjectStore as _, ParsedPath, RemotePath, parse_path};
use rc_core::{AliasManager, ListOptions, ObjectStore as _, ParsedPath, RemotePath, parse_path};
use rc_s3::S3Client;
use serde::Serialize;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -273,39 +273,160 @@ async fn move_s3_to_s3(
return ExitCode::Success;
}

// Copy
match client.copy_object(src, dst).await {
Ok(info) => {
// Delete source
if let Err(e) = client.delete_object(src).await {
formatter.error(&format!("Copied but failed to delete source: {e}"));
return ExitCode::GeneralError;
}
// Recursive move for prefix/directory semantics.
if args.recursive {
let mut continuation_token: Option<String> = None;
let mut moved_count = 0usize;
let mut error_count = 0usize;
let src_prefix = src.key.clone();

loop {
let list_opts = ListOptions {
recursive: true,
continuation_token: continuation_token.clone(),
..Default::default()
};

let list_result = match client.list_objects(src, list_opts).await {
Ok(result) => result,
Err(e) => {
formatter.error(&format!("Failed to list source objects: {e}"));
return ExitCode::NetworkError;
}
};

for item in &list_result.items {
if item.is_dir {
continue;
}

let relative = if src_prefix.is_empty() {
item.key.clone()
} else if let Some(rest) = item.key.strip_prefix(&src_prefix) {
rest.trim_start_matches('/').to_string()
} else {
item.key.clone()
};

if formatter.is_json() {
let output = MvOutput {
status: "success",
source: src_display,
target: dst_display,
size_bytes: info.size_bytes,
let target_key = if dst.key.is_empty() {
relative.clone()
} else if dst.key.ends_with('/') {
format!("{}{}", dst.key, relative)
} else {
format!("{}/{}", dst.key, relative)
};
formatter.json(&output);
} else {
formatter.println(&format!(
"{src_display} -> {dst_display} ({})",
info.size_human.unwrap_or_default()
));

let src_obj = RemotePath::new(&src.alias, &src.bucket, &item.key);
let dst_obj = RemotePath::new(&dst.alias, &dst.bucket, &target_key);
let src_obj_display = format!("{}/{}/{}", src.alias, src.bucket, src_obj.key);
let dst_obj_display = format!("{}/{}/{}", dst.alias, dst.bucket, dst_obj.key);
Comment thread
overtrue marked this conversation as resolved.
Outdated

match client.copy_object(&src_obj, &dst_obj).await {
Ok(_) => match client.delete_object(&src_obj).await {
Ok(()) => {
moved_count += 1;
if !formatter.is_json() {
formatter
.println(&format!("{src_obj_display} -> {dst_obj_display}"));
}
}
Err(e) => {
error_count += 1;
formatter.error(&format!(
"Copied but failed to delete source '{src_obj_display}': {e}"
));
if !args.continue_on_error {
return ExitCode::GeneralError;
}
}
},
Err(e) => {
error_count += 1;
formatter.error(&format!(
"Failed to move '{src_obj_display}' -> '{dst_obj_display}': {e}"
));
if !args.continue_on_error {
return ExitCode::NetworkError;
}
}
}
}

if !list_result.truncated {
break;
}
continuation_token = list_result.continuation_token.clone();
Comment thread
overtrue marked this conversation as resolved.
Outdated
}

if formatter.is_json() {
#[derive(Serialize)]
struct MvRecursiveOutput {
status: &'static str,
source: String,
target: String,
moved: usize,
errors: usize,
}

formatter.json(&MvRecursiveOutput {
status: if error_count == 0 {
"success"
} else {
"partial"
},
source: src_display,
target: dst_display,
moved: moved_count,
errors: error_count,
});
} else if error_count == 0 {
formatter.println(&format!("Moved {moved_count} object(s)."));
} else {
formatter.println(&format!(
"Moved {moved_count} object(s), {error_count} failed."
));
}

if error_count == 0 {
ExitCode::Success
} else {
ExitCode::GeneralError
}
Err(e) => {
let err_str = e.to_string();
if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
formatter.error(&format!("Source not found: {src_display}"));
ExitCode::NotFound
} else {
formatter.error(&format!("Failed to move: {e}"));
ExitCode::NetworkError
} else {
// Copy
match client.copy_object(src, dst).await {
Ok(info) => {
// Delete source
if let Err(e) = client.delete_object(src).await {
formatter.error(&format!("Copied but failed to delete source: {e}"));
return ExitCode::GeneralError;
}

if formatter.is_json() {
let output = MvOutput {
status: "success",
source: src_display,
target: dst_display,
size_bytes: info.size_bytes,
};
formatter.json(&output);
} else {
formatter.println(&format!(
"{src_display} -> {dst_display} ({})",
info.size_human.unwrap_or_default()
));
}
ExitCode::Success
}
Err(e) => {
let err_str = e.to_string();
if err_str.contains("NotFound") || err_str.contains("NoSuchKey") {
formatter.error(&format!("Source not found: {src_display}"));
ExitCode::NotFound
} else {
formatter.error(&format!("Failed to move: {e}"));
ExitCode::NetworkError
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/admin/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,14 @@ pub struct BucketQuota {
pub size: u64,

/// Quota type (currently only HARD)
#[serde(default = "default_quota_type")]
pub quota_type: String,
}

fn default_quota_type() -> String {
"HARD".to_string()
}
Comment thread
overtrue marked this conversation as resolved.

#[cfg(test)]
mod tests {
use super::*;
Expand Down
21 changes: 15 additions & 6 deletions crates/s3/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use rc_core::{
};
use tokio::io::AsyncReadExt;

const SINGLE_PUT_OBJECT_MAX_SIZE: u64 = 5 * 1024 * 1024 * 1024;
/// Keep single-part uploads small to avoid backend incompatibilities with
/// streaming aws-chunked payloads.
const SINGLE_PUT_OBJECT_MAX_SIZE: u64 = 64 * 1024 * 1024;
Comment thread
overtrue marked this conversation as resolved.
Outdated

/// Custom HTTP connector using reqwest, supporting insecure TLS (skip cert verification)
/// and custom CA bundles. Used when `alias.insecure = true` or `alias.ca_bundle.is_some()`.
Expand Down Expand Up @@ -196,6 +198,14 @@ impl S3Client {
// Build S3 client with path-style addressing for compatibility
let s3_config = aws_sdk_s3::config::Builder::from(&config)
.force_path_style(alias.bucket_lookup == "path" || alias.bucket_lookup == "auto")
// Improve compatibility with S3-compatible backends by only sending request
// checksums when the operation explicitly requires them.
.request_checksum_calculation(
aws_sdk_s3::config::RequestChecksumCalculation::WhenRequired,
)
.response_checksum_validation(
aws_sdk_s3::config::ResponseChecksumValidation::WhenRequired,
)
.build();

let client = aws_sdk_s3::Client::from_conf(s3_config);
Expand Down Expand Up @@ -270,11 +280,10 @@ impl S3Client {
content_type: Option<&str>,
file_size: u64,
) -> Result<ObjectInfo> {
let body = aws_sdk_s3::primitives::ByteStream::read_from()
.path(file_path)
.build()
let data = tokio::fs::read(file_path)
.await
.map_err(|e| Error::General(format!("build request body: {e}")))?;
.map_err(|e| Error::General(format!("read file '{}': {e}", file_path.display())))?;
let body = aws_sdk_s3::primitives::ByteStream::from(data);

let mut request = self
.inner
Expand Down Expand Up @@ -1188,7 +1197,7 @@ mod tests {
assert!(!S3Client::should_use_multipart(0));
assert!(!S3Client::should_use_multipart(1024 * 1024));
assert!(!S3Client::should_use_multipart(
crate::multipart::DEFAULT_PART_SIZE + 1
crate::multipart::DEFAULT_PART_SIZE
));
assert!(!S3Client::should_use_multipart(SINGLE_PUT_OBJECT_MAX_SIZE));
}
Expand Down