Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions net/curl/inc/ROOT/RCurlConnection.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public:
RStatus SendRangesReq(std::size_t N, RUserRange *ranges);
/// Uploads data to the URL using an HTTP PUT request.
RStatus SendPutReq(const unsigned char *data, std::size_t length);
/// Retargets the connection to `url` (reusing it, so curl can keep the connection alive across
/// requests to the same host) and uploads data with an HTTP PUT request.
RStatus SendPutReq(const std::string &url, const unsigned char *data, std::size_t length);
Comment on lines +122 to +124

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer making SetUrl public rather than an overload.


const std::string &GetEscapedUrl() const { return fEscapedUrl; }

Expand Down
9 changes: 9 additions & 0 deletions net/curl/src/RCurlConnection.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,15 @@ ROOT::Internal::RCurlConnection::SendPutReq(const unsigned char *data, std::size
return status;
}

ROOT::Internal::RCurlConnection::RStatus
ROOT::Internal::RCurlConnection::SendPutReq(const std::string &url, const unsigned char *data, std::size_t length)
{
auto result = SetUrl(url);
if (!result)
result.Throw();
return SendPutReq(data, length);
}

void ROOT::Internal::RCurlConnection::SetCredentials(const RS3Credentials &credentials)
{
ClearCredentials();
Expand Down
28 changes: 26 additions & 2 deletions net/curl/test/curl_connection.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,30 @@ TEST(RCurlConnection, Put)
EXPECT_EQ(std::string(reinterpret_cast<const char *>(payload), payloadLen), body);
}

TEST(RCurlConnection, PutWithUrl)
{
TServerSocket sock(0, false, TServerSocket::kDefaultBacklog, -1, ESocketBindOption::kInaddrLoopback);
const std::string baseUrl =
std::string("http://") + sock.GetLocalInetAddress().GetHostAddress() + ":" + std::to_string(sock.GetLocalPort());

const unsigned char payload[] = "object body";
const std::size_t payloadLen = sizeof(payload) - 1; // exclude null terminator

std::string headers;
std::string body;
std::thread threadRecv(TaskRecvPut, &sock, &headers, &body);

// The connection is created with the base URL; the SendPutReq overload retargets it to a per-request
// URL (the mechanism that lets one connection be reused across many objects on the same host).
ROOT::Internal::RCurlConnection conn(baseUrl);
auto status = conn.SendPutReq(baseUrl + "/myobject/42", payload, payloadLen);

threadRecv.join();
EXPECT_TRUE(static_cast<bool>(status));
EXPECT_EQ(0u, headers.find("PUT /myobject/42 ")) << headers.substr(0, 40);
EXPECT_EQ(std::string(reinterpret_cast<const char *>(payload), payloadLen), body);
}

/// GET (range read) after PUT on the same handle — verifies that WRITEFUNCTION is set correctly
/// in SendRangesReq after a PUT cleared it.
TEST(RCurlConnection, GetAfterPut)
Expand Down Expand Up @@ -194,8 +218,8 @@ TEST(RCurlConnection, GetAfterPut)
}
}

std::string response = "HTTP/1.1 200 OK\r\nContent-Length: " + std::to_string(expectedBody.size()) +
"\r\n\r\n" + expectedBody;
std::string response =
"HTTP/1.1 200 OK\r\nContent-Length: " + std::to_string(expectedBody.size()) + "\r\n\r\n" + expectedBody;
s->SendRaw(response.data(), response.size());
s->Close();
};
Expand Down
7 changes: 7 additions & 0 deletions tree/ntuple/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
# @author Jakob Blomer CERN
############################################################################

# RNTuple optionally writes to S3-compatible object storage via libcurl (RCurlHttp).
if(curl)
set(ROOTNTuple_OPTIONAL_DEPENDENCIES RCurlHttp)
endif()

ROOT_STANDARD_LIBRARY_PACKAGE(ROOTNTuple
HEADERS
ROOT/RCluster.hxx
Expand Down Expand Up @@ -100,6 +105,7 @@ DEPENDENCIES
Imt
RIO
ROOTVecOps
${ROOTNTuple_OPTIONAL_DEPENDENCIES}
)

target_link_libraries(ROOTNTuple PRIVATE xxHash::xxHash)
Expand All @@ -125,6 +131,7 @@ endif()
if(curl)
set(ROOTNTuple_EXTRA_HEADERS ${ROOTNTuple_EXTRA_HEADERS} ROOT/RPageStorageS3.hxx)
target_sources(ROOTNTuple PRIVATE src/RPageStorageS3.cxx)
target_compile_definitions(ROOTNTuple PRIVATE R__ENABLE_S3)
target_link_libraries(ROOTNTuple PRIVATE nlohmann_json::nlohmann_json)
endif()

Expand Down
73 changes: 70 additions & 3 deletions tree/ntuple/inc/ROOT/RPageStorageS3.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
#ifndef ROOT_RPageStorageS3
#define ROOT_RPageStorageS3

#include <ROOT/RCurlConnection.hxx>
#include <ROOT/RError.hxx>
#include <ROOT/RNTuple.hxx>
#include <ROOT/RPageStorage.hxx>

#include <cstdint>
#include <memory>
#include <string>
#include <string_view>

namespace ROOT {
namespace Experimental {
Expand All @@ -43,9 +47,10 @@ struct RNTupleAnchorS3 {
std::uint16_t fVersionMajor = RNTuple::kVersionMajor;
std::uint16_t fVersionMinor = RNTuple::kVersionMinor;
std::uint16_t fVersionPatch = RNTuple::kVersionPatch;
/// Pattern for resolving object IDs to full S3 URLs.
/// ${baseurl} is replaced with the anchor URL, ${objid} with the numeric object ID.
std::string fUrlTemplate;
/// Pattern for resolving object IDs to full S3 URLs. ${baseurl} is replaced with the anchor URL,
/// ${objid} with the numeric object ID. Defaults to the scheme this writer uses; the reader
/// overrides it from the stored anchor.
std::string fUrlTemplate = "${baseurl}/${objid}";
/// Object ID and byte offset of the compressed header within the S3 object
std::uint64_t fHeaderObjId = 0;
std::uint64_t fHeaderOffset = 0;
Expand All @@ -67,6 +72,68 @@ struct RNTupleAnchorS3 {
static RResult<RNTupleAnchorS3> CreateFromJSON(const std::string &json);
};

/// \brief Parsed components of an S3-scheme URI
struct RS3Url {
std::string fHttpUrl; ///< The full HTTP(S) URL (the s3 scheme prefix translated to http/https)
std::string fScheme; ///< "http" or "https"
};

/// \brief Translate an S3-scheme URI into its HTTP equivalent.
///
/// Accepts `s3+http://host/bucket/path`, `s3+https://host/bucket/path`, and `s3://bucket/path`
/// (the bare form defaults to the AWS endpoint). Throws RException on an unrecognized scheme.
RS3Url ParseS3Url(std::string_view uri);

// clang-format off
/**
\class ROOT::Experimental::Internal::RPageSinkS3
\ingroup NTuple
\brief Storage provider that writes ntuple pages into S3-compatible object storage.

Currently implements Mode B (one sealed page per S3 object, kTypeObject64 locators).
Mode A (multiple packed pages per object, kTypeMulti locators) will be added separately.

\warning The S3 backend is experimental and under active development.
*/
// clang-format on
class RPageSinkS3 : public ROOT::Internal::RPagePersistentSink {
private:
/// HTTP base URL for this ntuple (derived from the s3 scheme URI); never has a trailing slash
std::string fBaseUrl;
/// One HTTP connection reused for every upload, so curl keeps it alive across objects on the same
/// host instead of re-handshaking per object.
ROOT::Internal::RCurlConnection fConnection;
/// Object ID counter; incremented for each object written.
std::uint64_t fObjectId{0};
/// Tracks the number of bytes committed to the current cluster (reset in StageClusterImpl)
std::uint64_t fNBytesCurrentCluster{0};
/// Anchor metadata populated during the write path and uploaded last in CommitDatasetImpl
RNTupleAnchorS3 fAnchor;

/// Resolve a numeric object ID to its full HTTP URL
std::string MakeObjectUrl(std::uint64_t objId) const;
/// Upload raw bytes to the given S3 URL via an HTTP PUT request
void PutObject(const std::string &url, const unsigned char *data, std::size_t size);

protected:
using RPagePersistentSink::InitImpl;
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final;
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final;
RNTupleLocator
CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
std::uint64_t StageClusterImpl() final;
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final;
using RPagePersistentSink::CommitDatasetImpl;
ROOT::Internal::RNTupleLink CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final;

public:
RPageSinkS3(std::string_view ntupleName, std::string_view uri, const ROOT::RNTupleWriteOptions &options);
~RPageSinkS3() override;

std::unique_ptr<ROOT::Internal::RPageSink>
CloneAsHidden(std::string_view name, const ROOT::RNTupleWriteOptions &opts) const final;
}; // class RPageSinkS3

} // namespace Internal
} // namespace Experimental
} // namespace ROOT
Expand Down
15 changes: 15 additions & 0 deletions tree/ntuple/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
#include <ROOT/RPageAllocator.hxx>
#include <ROOT/RPageSinkBuf.hxx>
#include <ROOT/RPageStorageFile.hxx>
#include <ROOT/StringUtils.hxx>
#ifdef R__ENABLE_DAOS
#include <ROOT/RPageStorageDaos.hxx>
#endif
#ifdef R__ENABLE_S3
#include <ROOT/RPageStorageS3.hxx>
#endif

#include <Compression.h>
#include <TError.h>
Expand Down Expand Up @@ -190,6 +194,9 @@ ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_vie
throw RException(R__FAIL("This RNTuple build does not support DAOS."));
#endif

if (ROOT::StartsWith(location, "ntpl+s3"))
throw RException(R__FAIL("S3 read support is not yet implemented."));

return std::make_unique<ROOT::Internal::RPageSourceFile>(ntupleName, location, options);
}

Expand Down Expand Up @@ -904,6 +911,14 @@ ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::st
#endif
}

if (ROOT::StartsWith(location, "ntpl+s3")) {
#ifdef R__ENABLE_S3
return std::make_unique<ROOT::Experimental::Internal::RPageSinkS3>(ntupleName, location, options);
#else
throw RException(R__FAIL("This RNTuple build does not support S3."));
#endif
}

// Otherwise assume that the user wants us to create a file.
return std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, location, options);
}
Expand Down
Loading