Skip to content
This repository was archived by the owner on Dec 5, 2024. It is now read-only.

[WIP] Chunked oci upload #32

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions include/curl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,28 @@ get_handle();

struct Response
{
int status_code;
long http_status;
std::map<std::string, std::string> header;
mutable std::stringstream content;

curl_off_t avg_speed, downloaded_size;
long http_status;
std::string effective_url;

inline nlohmann::json json() const
{
nlohmann::json j;
content >> j;
return j;
try
{
nlohmann::json j;
content >> j;
return j;
}
catch (const nlohmann::detail::parse_error& err)
{
std::cout << "Could not parse " << content.str() << std::endl;
std::cout << err.what() << std::endl;
// exit(1);
return nlohmann::json();
}
}
};

Expand Down Expand Up @@ -126,7 +135,6 @@ class CURLHandle
// setopt(CURLOPT_NOBODY, 0L);
// else
// setopt(CURLOPT_NOBODY, 1L);

// return *this;
// }

Expand Down
3 changes: 2 additions & 1 deletion include/mirrors/oci.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,5 @@ Response
oci_upload(OCIMirror& mirror,
const std::string& reference,
const std::string& tag,
const fs::path& file);
const fs::path& file,
bool chunked = false);
40 changes: 30 additions & 10 deletions src/cli/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ progress_callback(DownloadTarget* t, curl_off_t total, curl_off_t done)
}

int
handle_upload(const std::vector<std::string>& files, const std::vector<std::string>& mirrors)
handle_upload(const std::vector<std::string>& files, const std::vector<std::string>& mirrors, bool chunked)
{
std::cout << "Uploading files " << std::endl;
std::string mirror_url = mirrors[0];
if (mirrors.size() > 1)
spdlog::warn("Only uploading to first mirror");
Expand All @@ -84,7 +85,16 @@ handle_upload(const std::vector<std::string>& files, const std::vector<std::stri
kof = KindOf::OCI;

if (kof != KindOf::HTTP)
url.set_scheme("https");
{
if (url.host() == "localhost")
{
url.set_scheme("http");
}
else
{
url.set_scheme("https");
}
}

spdlog::info("URL: {}", url.url());

Expand All @@ -103,9 +113,16 @@ handle_upload(const std::vector<std::string>& files, const std::vector<std::stri
}
std::string GH_SECRET = get_env("GHA_PAT");
std::string GH_USER = get_env("GHA_USER");

OCIMirror mirror(url.url(), "push", GH_USER, GH_SECRET);
oci_upload(mirror, GH_USER + "/" + dest, elems[2], elems[0]);
if (url.host() == "localhost")
{
OCIMirror mirror(url.url(), "push", "", "");
oci_upload(mirror, GH_USER + "/" + dest, elems[2], elems[0], chunked);
}
else
{
OCIMirror mirror(url.url(), "push", GH_USER, GH_SECRET);
oci_upload(mirror, GH_USER + "/" + dest, elems[2], elems[0], chunked);
}
}
else if (kof == KindOf::S3)
{
Expand Down Expand Up @@ -208,12 +225,13 @@ int
main(int argc, char** argv)
{
CLI::App app;

std::cout << "Powerloader ... " << std::endl;
bool resume = false;
std::vector<std::string> du_files;
std::vector<std::string> mirrors;
std::string file;
bool verbose = false;
static bool verbose = false;
static bool chunked = false;

CLI::App* s_dl = app.add_subcommand("download", "Download a file");
s_dl->add_option("files", du_files, "Files to download");
Expand All @@ -225,8 +243,10 @@ main(int argc, char** argv)
s_ul->add_option("files", du_files, "Files to upload");
s_ul->add_option("-m", mirrors, "Mirror to upload to");
s_ul->add_option("-f", file, "File from which to read upload / download files");
s_ul->add_flag("-c,--chunked", chunked, "Use chunked (resumable) upload (NOTE: this does not really work yet)");

app.add_option("-v", verbose, "Enable verbose output");
app.add_flag("-v", verbose, "Enable verbose output");
s_ul->add_flag("-v", verbose, "Enable verbose output");

CLI11_PARSE(app, argc, argv);

Expand Down Expand Up @@ -280,10 +300,10 @@ main(int argc, char** argv)
}
}
}
spdlog::set_level(spdlog::level::warn);
// spdlog::set_level(spdlog::level::warn);
if (app.got_subcommand("upload"))
{
return handle_upload(du_files, mirrors);
return handle_upload(du_files, mirrors, chunked);
}
if (app.got_subcommand("download"))
{
Expand Down
2 changes: 1 addition & 1 deletion src/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ read_callback(char* ptr, std::size_t size, std::size_t nmemb, T* stream)
// copy as much data as possible into the 'ptr' buffer, but no more than
// 'size' * 'nmemb' bytes!
stream->read(ptr, size * nmemb);
spdlog::info("Uploading {} bytes of data!", stream->gcount());
spdlog::info("M: Uploading {} bytes of data!", stream->gcount());
return stream->gcount();
}

Expand Down
188 changes: 173 additions & 15 deletions src/uploader/oci_upload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,198 @@

#include "mirrors/oci.hpp"

template <class S>
struct UploadChunk
{
std::size_t endpos;
S* stream;

std::size_t total = 0;
};

template <class T>
std::size_t
read_chunk_callback(char* ptr, std::size_t size, std::size_t nmemb, T* uchunk)
{
// TODO stream error handling?!
// copy as much data as possible into the 'ptr' buffer, but no more than
// 'size' * 'nmemb' bytes!
std::size_t total = size * nmemb;
if (std::size_t(uchunk->stream->tellg()) + total > uchunk->endpos)
total = uchunk->endpos - uchunk->stream->tellg();
uchunk->stream->read(ptr, total);
std::size_t x = uchunk->stream->gcount();

uchunk->total += x;
spdlog::info("Uploading chunk {} bytes of requested {} data (Total now {})", uchunk->stream->gcount(), size * nmemb, uchunk->total);

return x;
}

template <class S>
Response
oci_upload(OCIMirror& mirror,
const std::string& reference,
const std::string& tag,
const fs::path& file)
upload_oci_chunk(CURLHandle& c, S& stream, std::size_t pos, std::size_t chunk_size)
{
std::string digest = fmt::format("sha256:{}", sha256sum(file));
std::size_t fsize = fs::file_size(file);
static UploadChunk<S> upload_chunk;
upload_chunk.stream = &stream;
upload_chunk.endpos = pos + chunk_size;

CURLHandle auth_handle;
if (mirror.prepare(reference, auth_handle))
c.setopt(CURLOPT_UPLOAD, 1L);
// Content-Type: application/octet-stream
// Content-Range: <range>
// Content-Length: <length>
stream.seekg(pos, stream.beg);
c.add_header(fmt::format("Content-Range: {}-{}", pos, upload_chunk.endpos - 1));
c.add_header(fmt::format("Content-Length: {}", chunk_size));
c.add_header("Content-Type: application/octet-stream");

c.setopt(CURLOPT_VERBOSE, 1L);
c.setopt(CURLOPT_INFILESIZE_LARGE, chunk_size);
c.setopt(CURLOPT_CUSTOMREQUEST, "PATCH");
c.setopt(CURLOPT_READFUNCTION, read_chunk_callback<UploadChunk<S>>);
c.setopt(CURLOPT_READDATA, &upload_chunk);
auto r = c.perform();
return r;
}

namespace detail
{
std::string format_url(const OCIMirror& mirror, const std::string& location)
{
auth_handle.perform();
if (!contains(location, "://"))
{
return fmt::format("{}{}", mirror.url, location);
}
return location;
}

std::string add_param(const std::string& url, const std::string& param, const std::string& value)
{
if (contains(url, "?"))
{
return fmt::format("{}&{}={}", url, param, value);
}
return fmt::format("{}?{}={}", url, param, value);
}
}


void upload_chunked(const fs::path& file, OCIMirror& mirror, const std::string& reference, const std::string& digest)
{
std::string preupload_url = mirror.get_preupload_url(reference);
auto response = CURLHandle(preupload_url)
.setopt(CURLOPT_CUSTOMREQUEST, "POST")
.add_headers(mirror.get_auth_headers(reference))
.add_header("Content-Length: 0")
.perform();

std::string temp_upload_location = response.header["location"];
std::string upload_url
= fmt::format("{}{}?digest={}", mirror.url, temp_upload_location, digest);
assert(response.http_status == 202);

std::string upload_url = detail::format_url(mirror, response.header["location"]);

std::ifstream ufile(file, std::ios::in | std::ios::binary);

ufile.seekg(0, ufile.end);
std::size_t file_size = ufile.tellg();
ufile.seekg(0, ufile.beg);
std::size_t chunk_size = 500'000;
std::size_t pos = 0;

while (true)
{
CURLHandle chandle(upload_url);

if (pos + chunk_size > file_size)
{
chunk_size = file_size - pos;
}
chandle.reset_headers();
chandle.add_headers(mirror.get_auth_headers(reference));

std::cout << "Uploading chunk " << pos << std::endl;
auto response = upload_oci_chunk(chandle, ufile, pos, chunk_size);

upload_url = detail::format_url(mirror, response.header["location"]);

pos += chunk_size;
if (pos >= file_size)
break;

assert(response.http_status == 202);
}

auto upload_url_digest = detail::add_param(upload_url, "digest", digest);
CURLHandle finalize;
finalize.url(upload_url_digest);
finalize.setopt(CURLOPT_CUSTOMREQUEST, "PUT");
finalize.setopt(CURLOPT_VERBOSE, 1);
finalize.add_headers(mirror.get_auth_headers(reference));
finalize.add_header("Content-Type: application/octet-stream");
auto res = finalize.perform();
std::cout << res.content.str() << std::endl;
}

template <class S>
auto upload_monolithically(S& stream, OCIMirror& mirror, const std::string& reference, const std::string& digest, std::size_t fsize)
{
std::string preupload_url = mirror.get_preupload_url(reference);
auto response = CURLHandle(preupload_url)
.setopt(CURLOPT_CUSTOMREQUEST, "POST")
.add_headers(mirror.get_auth_headers(reference))
.perform();

assert(response.http_status == 202);
std::string upload_url = detail::format_url(mirror, response.header["location"]);
upload_url = detail::add_param(upload_url, "digest", digest);

spdlog::info("Upload url: {}", upload_url);

CURLHandle chandle(upload_url);

std::ifstream ufile(file, std::ios::in | std::ios::binary);
chandle.setopt(CURLOPT_UPLOAD, 1L)
.add_headers(mirror.get_auth_headers(reference))
.add_header("Content-Type: application/octet-stream")
.upload(ufile);
.upload(stream);

auto cres = chandle.perform();
assert(cres.http_status == 201);
return cres;
}


Response
oci_upload(OCIMirror& mirror,
const std::string& reference,
const std::string& tag,
const fs::path& file,
bool chunked)
{
std::string digest = fmt::format("sha256:{}", sha256sum(file));
std::size_t fsize = fs::file_size(file);

CURLHandle auth_handle;
if (mirror.prepare(reference, auth_handle))
{
auth_handle.perform();
}
std::cout << "Uploading " << file << std::endl;
if (chunked)
{
std::cout << "Uploading chunked" << std::endl;
upload_chunked(file, mirror, reference, digest);
}
else
{
std::cout << "Uploading monolithically" << std::endl;
std::ifstream ufile(file, std::ios::in | std::ios::binary);
upload_monolithically(ufile, mirror, reference, digest, fsize);
}

std::istringstream is("");
upload_monolithically(is, mirror, reference, EMPTY_SHA, 0);


// CURLHandle check_empty_manifest_exists()

// Now we need to upload the manifest for OCI servers
std::string manifest_url = fmt::format("{}/v2/{}/manifests/{}", mirror.url, reference, tag);
Expand All @@ -51,5 +207,7 @@ oci_upload(OCIMirror& mirror,
.add_header("Content-Type: application/vnd.oci.image.manifest.v1+json")
.upload(manifest_stream);

return mhandle.perform();
auto response = mhandle.perform();
assert(response.http_status == 201);
return response;
}