Skip to content
112 changes: 86 additions & 26 deletions src/iceberg/arrow/arrow_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <limits>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <vector>

#include <arrow/buffer.h>
Expand All @@ -35,6 +36,8 @@
#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/s3/arrow_s3_internal.h"
#include "iceberg/util/location_util.h"
#include "iceberg/util/macros.h"

namespace iceberg::arrow {
Expand Down Expand Up @@ -473,13 +476,53 @@ class ArrowOutputFile : public OutputFile {

} // namespace

Result<std::string> ArrowFileSystemFileIO::ResolvePath(const std::string& file_location) {
const std::shared_ptr<::arrow::fs::FileSystem>& ArrowFileSystemFileIO::FileSystemForPath(
std::string_view location) const {
if (fs_by_prefix_.empty()) {
return arrow_fs_;
}
// Longest matching prefix wins; fall back to the default file system.
const std::string canonical = LocationUtil::CanonicalizeS3Scheme(location);
const std::shared_ptr<::arrow::fs::FileSystem>* best = &arrow_fs_;
size_t best_len = 0;
for (const auto& [prefix, fs] : fs_by_prefix_) {
if (prefix.size() > best_len && LocationUtil::PathHasPrefix(canonical, prefix)) {
best = &fs;
best_len = prefix.size();
}
}
return *best;
}

Status ArrowFileSystemFileIO::SetStorageCredentials(
const std::vector<
std::pair<std::string, std::unordered_map<std::string, std::string>>>&
properties_by_prefix) {
#if ICEBERG_S3_ENABLED
std::vector<std::pair<std::string, std::shared_ptr<::arrow::fs::FileSystem>>>
fs_by_prefix;
fs_by_prefix.reserve(properties_by_prefix.size());
for (const auto& [prefix, properties] : properties_by_prefix) {
ICEBERG_ASSIGN_OR_RAISE(auto fs, BuildArrowS3FileSystem(properties));
fs_by_prefix.emplace_back(prefix, std::move(fs));
}
fs_by_prefix_ = std::move(fs_by_prefix);
return {};
#else
(void)properties_by_prefix;
return NotSupported("S3 storage credentials require Arrow S3 support");
#endif
}

Result<std::string> ArrowFileSystemFileIO::ResolvePath(
const std::shared_ptr<::arrow::fs::FileSystem>& fs,
const std::string& file_location) {
const auto pos = file_location.find("://");
if (pos == std::string::npos) {
return file_location;
}

auto path = arrow_fs_->PathFromUri(file_location);
auto path = fs->PathFromUri(file_location);
if (path.ok()) {
return std::move(path).ValueOrDie();
}
Expand All @@ -502,14 +545,14 @@ Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenArrowInputStream(
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");

if (auto arrow_io = std::dynamic_pointer_cast<ArrowFileSystemFileIO>(io)) {
ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(path));
const auto& fs = arrow_io->FileSystemForPath(path);
ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(fs, path));
::arrow::fs::FileInfo file_info(resolved_path, ::arrow::fs::FileType::File);
if (length.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(*length));
file_info.set_size(size);
}
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input,
arrow_io->arrow_fs_->OpenInputFile(file_info));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, fs->OpenInputFile(file_info));
return input;
}

Expand All @@ -533,16 +576,15 @@ Result<std::shared_ptr<::arrow::io::OutputStream>> OpenArrowOutputStream(
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");

if (auto arrow_io = std::dynamic_pointer_cast<ArrowFileSystemFileIO>(io)) {
ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(path));
const auto& fs = arrow_io->FileSystemForPath(path);
ICEBERG_ASSIGN_OR_RAISE(auto resolved_path, arrow_io->ResolvePath(fs, path));
if (!overwrite) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto info,
arrow_io->arrow_fs_->GetFileInfo(resolved_path));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto info, fs->GetFileInfo(resolved_path));
if (info.type() != ::arrow::fs::FileType::NotFound) {
return AlreadyExists("File already exists: {}", path);
}
}
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output,
arrow_io->arrow_fs_->OpenOutputStream(resolved_path));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, fs->OpenOutputStream(resolved_path));
return output;
}

Expand All @@ -558,42 +600,60 @@ Result<std::shared_ptr<::arrow::io::OutputStream>> OpenArrowOutputStream(

Result<std::unique_ptr<InputFile>> ArrowFileSystemFileIO::NewInputFile(
std::string file_location) {
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
return std::make_unique<ArrowInputFile>(arrow_fs_, std::move(file_location),
std::move(path), std::nullopt);
const auto& fs = FileSystemForPath(file_location);
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(fs, file_location));
return std::make_unique<ArrowInputFile>(fs, std::move(file_location), std::move(path),
std::nullopt);
}

Result<std::unique_ptr<InputFile>> ArrowFileSystemFileIO::NewInputFile(
std::string file_location, size_t length) {
ICEBERG_ASSIGN_OR_RAISE(auto size, ToInt64Length(length));
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
return std::make_unique<ArrowInputFile>(arrow_fs_, std::move(file_location),
std::move(path), size);
const auto& fs = FileSystemForPath(file_location);
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(fs, file_location));
return std::make_unique<ArrowInputFile>(fs, std::move(file_location), std::move(path),
size);
}

Result<std::unique_ptr<OutputFile>> ArrowFileSystemFileIO::NewOutputFile(
std::string file_location) {
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
return std::make_unique<ArrowOutputFile>(arrow_fs_, std::move(file_location),
std::move(path));
const auto& fs = FileSystemForPath(file_location);
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(fs, file_location));
return std::make_unique<ArrowOutputFile>(fs, std::move(file_location), std::move(path));
}

/// \brief Delete a file at the given location.
Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) {
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path));
const auto& fs = FileSystemForPath(file_location);
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(fs, file_location));
ICEBERG_ARROW_RETURN_NOT_OK(fs->DeleteFile(path));
return {};
}

Status ArrowFileSystemFileIO::DeleteFiles(
const std::vector<std::string>& file_locations) {
std::vector<std::string> paths;
paths.reserve(file_locations.size());
if (fs_by_prefix_.empty()) {
// No per-prefix routing: one batched delete on the default file system.
std::vector<std::string> paths;
paths.reserve(file_locations.size());
for (const auto& file_location : file_locations) {
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(arrow_fs_, file_location));
paths.push_back(std::move(path));
}
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFiles(paths));
return {};
}

// Paths may route to different file systems: group by fs, then batch per fs.
std::unordered_map<::arrow::fs::FileSystem*, std::vector<std::string>> paths_by_fs;
for (const auto& file_location : file_locations) {
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
paths.push_back(std::move(path));
const auto& fs = FileSystemForPath(file_location);
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(fs, file_location));
paths_by_fs[fs.get()].push_back(std::move(path));
}
for (auto& [fs, paths] : paths_by_fs) {
ICEBERG_ARROW_RETURN_NOT_OK(fs->DeleteFiles(paths));
}
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFiles(paths));
return {};
}

Expand Down
24 changes: 21 additions & 3 deletions src/iceberg/arrow/arrow_io_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>

#include <arrow/filesystem/type_fwd.h>
Expand Down Expand Up @@ -52,7 +55,8 @@ OpenArrowOutputStream(const std::shared_ptr<FileIO>& io, const std::string& path
bool overwrite = true);

/// \brief A concrete implementation of FileIO for Arrow file system.
class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO,
public SupportsStorageCredentials {
public:
explicit ArrowFileSystemFileIO(std::shared_ptr<::arrow::fs::FileSystem> arrow_fs)
: arrow_fs_(std::move(arrow_fs)) {}
Expand Down Expand Up @@ -81,6 +85,12 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
/// \brief Delete files at the given locations.
Status DeleteFiles(const std::vector<std::string>& file_locations) override;

/// \brief Build one S3 file system per credential prefix for per-path routing.
Status SetStorageCredentials(
const std::vector<
std::pair<std::string, std::unordered_map<std::string, std::string>>>&
properties_by_prefix) override;

/// \brief Get the Arrow file system.
const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; }

Expand All @@ -92,10 +102,18 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
friend Result<std::shared_ptr<::arrow::io::OutputStream>> OpenArrowOutputStream(
const std::shared_ptr<FileIO>& io, const std::string& path, bool overwrite);

/// \brief Resolve a file location to a filesystem path.
Result<std::string> ResolvePath(const std::string& file_location);
/// \brief Pick the file system for `location` (longest matching prefix, else
/// the default).
const std::shared_ptr<::arrow::fs::FileSystem>& FileSystemForPath(
std::string_view location) const;

/// \brief Resolve a file location to a filesystem path using `fs`.
Result<std::string> ResolvePath(const std::shared_ptr<::arrow::fs::FileSystem>& fs,
const std::string& file_location);

std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
std::vector<std::pair<std::string, std::shared_ptr<::arrow::fs::FileSystem>>>
fs_by_prefix_;
};

} // namespace iceberg::arrow
71 changes: 47 additions & 24 deletions src/iceberg/arrow/s3/arrow_s3_file_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <optional>
#include <string>
#include <string_view>
#include <utility>

#include <arrow/filesystem/filesystem.h>
#if ICEBERG_S3_ENABLED
Expand All @@ -30,6 +31,7 @@
#include "iceberg/arrow/arrow_io_internal.h"
#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/s3/arrow_s3_internal.h"
#include "iceberg/arrow/s3/s3_properties.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"
Expand Down Expand Up @@ -74,6 +76,23 @@ Status EnsureS3Initialized() {
return {};
}

// Splits any URI scheme off `endpoint` into `options.scheme`, returning the bare
// host[:port] that Arrow's `endpoint_override` expects.
std::string SplitEndpointScheme(std::string_view endpoint,
::arrow::fs::S3Options& options) {
if (const auto pos = endpoint.find("://"); pos != std::string_view::npos) {
options.scheme = std::string(endpoint.substr(0, pos));
endpoint = endpoint.substr(pos + 3);
}
return std::string(endpoint);
}

#endif

} // namespace

#if ICEBERG_S3_ENABLED

/// \brief Configure S3Options from a properties map.
///
/// \param properties The configuration properties map.
Expand All @@ -100,26 +119,26 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
}

// Configure region
if (const auto* region = FindProperty(properties, S3Properties::kRegion);
region != nullptr) {
// Prefer the standard `client.region`; fall back to legacy `s3.region`.
const auto* region = FindProperty(properties, S3Properties::kClientRegion);
if (region == nullptr) {
region = FindProperty(properties, S3Properties::kRegion);
}
if (region != nullptr) {
options.region = *region;
}

// Configure endpoint (for MinIO, LocalStack, etc.)
// Configure endpoint (for MinIO, LocalStack, OSS, etc.) from `s3.endpoint` or
// the AWS standard env vars.
if (const auto* endpoint = FindProperty(properties, S3Properties::kEndpoint);
endpoint != nullptr) {
options.endpoint_override = *endpoint;
} else {
// Fall back to AWS standard environment variables for endpoint override
const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3");
if (s3_endpoint_env != nullptr) {
options.endpoint_override = s3_endpoint_env;
} else {
const char* endpoint_env = std::getenv("AWS_ENDPOINT_URL");
if (endpoint_env != nullptr) {
options.endpoint_override = endpoint_env;
}
}
options.endpoint_override = SplitEndpointScheme(*endpoint, options);
} else if (const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3");
s3_endpoint_env != nullptr) {
options.endpoint_override = SplitEndpointScheme(s3_endpoint_env, options);
} else if (const char* endpoint_env = std::getenv("AWS_ENDPOINT_URL");
endpoint_env != nullptr) {
options.endpoint_override = SplitEndpointScheme(endpoint_env, options);
}

ICEBERG_ASSIGN_OR_RAISE(const auto path_style_access,
Expand All @@ -128,11 +147,11 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
options.force_virtual_addressing = !*path_style_access;
}

// Configure SSL
// Explicit `s3.ssl.enabled` overrides any endpoint-derived scheme.
ICEBERG_ASSIGN_OR_RAISE(const auto ssl_enabled,
ParseOptionalBool(properties, S3Properties::kSslEnabled));
if (ssl_enabled.has_value() && !*ssl_enabled) {
options.scheme = "http";
if (ssl_enabled.has_value()) {
options.scheme = *ssl_enabled ? "https" : "http";
}

// Configure timeouts
Expand All @@ -154,17 +173,21 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
}
#endif

} // namespace

Result<std::unique_ptr<FileIO>> MakeS3FileIO(
const std::unordered_map<std::string, std::string>& properties) {
#if ICEBERG_S3_ENABLED
Result<std::shared_ptr<::arrow::fs::FileSystem>> BuildArrowS3FileSystem(
const std::unordered_map<std::string, std::string>& properties) {
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());

// Configure S3 options from properties (uses default credentials if empty)
ICEBERG_ASSIGN_OR_RAISE(auto options, ConfigureS3Options(properties));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::S3FileSystem::Make(options));
return std::shared_ptr<::arrow::fs::FileSystem>(std::move(fs));
}
#endif

Result<std::unique_ptr<FileIO>> MakeS3FileIO(
const std::unordered_map<std::string, std::string>& properties) {
#if ICEBERG_S3_ENABLED
// Uses default credentials if properties are empty.
ICEBERG_ASSIGN_OR_RAISE(auto fs, BuildArrowS3FileSystem(properties));
return std::make_unique<ArrowFileSystemFileIO>(std::move(fs));
#else
return NotSupported("Arrow S3 support is not enabled");
Expand Down
Loading
Loading