Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
35b77c1
feat: Implement StreamWriter and StructuredIrStreamWriter for IR data…
junhaoliao Apr 20, 2025
3698837
chore: Update .gitmodules to reorder existing entries.
junhaoliao Apr 20, 2025
a6f2748
feat: Add support for msgpack serialization and zstd compression in S…
junhaoliao Apr 21, 2025
075ec8d
Update test.mjs to use new object structure
junhaoliao Apr 21, 2025
9a98e0c
refactor: Remove unused serialization tracking in StructuredIrStreamW…
junhaoliao Apr 21, 2025
9f7a5c0
refactor: Remove unused JS object encoding function from StructuredIr…
junhaoliao Apr 21, 2025
ea38b6b
refactor: Simplify memory view creation in write method of Structured…
junhaoliao Apr 21, 2025
559bfb1
refactor: Remove debug output from flush method in StructuredIrStream…
junhaoliao Apr 21, 2025
74b807b
refactor: Enhance StreamWriter to accept WriterOptions for compressio…
junhaoliao Apr 21, 2025
3e5ada2
refactor: Update StreamWriter documentation to correct writing direct…
junhaoliao Apr 21, 2025
238590b
refactor: Use const for packed_user_gen_handle_length and memoryView …
junhaoliao Apr 21, 2025
d3d0080
refactor: Remove unused Emscripten headers from StructuredIrStreamWriter
junhaoliao Apr 21, 2025
cea226d
refactor: Mark WebStreamWriter class as final to prevent inheritance
junhaoliao Apr 21, 2025
7b426ad
refactor: Remove TODO comment regarding configurable compression leve…
junhaoliao Apr 21, 2025
2bbd1e8
refactor: Update pre-js dependency and modify packing method in Struc…
junhaoliao Apr 21, 2025
e9112bf
refactor: Expect msgpack bytes in StructuredIrStreamWriter::write()
junhaoliao Apr 21, 2025
940f234
refactor: Move msgpackr to devDependencies and update import path in …
junhaoliao Apr 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ build
cmake-build-*
dist

# Node.js
node_modules
package-lock.json

# Generated lint configs
.clang-format
.clang-tidy

# IDEs
.idea
.vscode

9 changes: 6 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
[submodule "src/submodules/fmt"]
path = src/submodules/fmt
url = https://github.com/fmtlib/fmt.git
[submodule "src/submodules/msgpack"]
path = src/submodules/msgpack
url = https://github.com/msgpack/msgpack-c.git
[submodule "src/submodules/spdlog"]
path = src/submodules/spdlog
url = https://github.com/gabime/spdlog.git
[submodule "tools/yscope-dev-utils"]
path = tools/yscope-dev-utils
url = https://github.com/y-scope/yscope-dev-utils.git
[submodule "src/submodules/zstd"]
path = src/submodules/zstd
url = https://github.com/facebook/zstd.git
[submodule "tools/yscope-dev-utils"]
path = tools/yscope-dev-utils
url = https://github.com/y-scope/yscope-dev-utils.git
21 changes: 21 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ endif()

set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/StreamReader.cpp
src/clp_ffi_js/ir/StreamWriter.cpp
src/clp_ffi_js/ir/StructuredIrStreamReader.cpp
src/clp_ffi_js/ir/StructuredIrStreamWriter.cpp
src/clp_ffi_js/ir/StructuredIrUnitHandler.cpp
src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
)
Expand All @@ -119,6 +121,11 @@ set(CLP_FFI_JS_SRC_CLP_CORE
src/submodules/clp/components/core/src/clp/ReaderInterface.cpp
src/submodules/clp/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp
src/submodules/clp/components/core/src/clp/TimestampPattern.cpp

src/submodules/clp/components/core/src/clp/ffi/ir_stream/Serializer.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/encoding_methods.cpp
src/submodules/clp/components/core/src/clp/ir/parsing.cpp
src/submodules/clp/components/core/src/clp/streaming_compression/zstd/Compressor.cpp
)

set(CLP_FFI_JS_SRC_FMT src/submodules/fmt/src/format.cc)
Expand All @@ -134,6 +141,19 @@ set(CLP_FFI_JS_SRC_ZSTD
src/submodules/zstd/lib/decompress/zstd_ddict.c
src/submodules/zstd/lib/decompress/zstd_decompress_block.c
src/submodules/zstd/lib/decompress/zstd_decompress.c

src/submodules/zstd/lib/compress/fse_compress.c
src/submodules/zstd/lib/compress/hist.c
src/submodules/zstd/lib/compress/huf_compress.c
src/submodules/zstd/lib/compress/zstd_compress.c
src/submodules/zstd/lib/compress/zstd_compress_literals.c
src/submodules/zstd/lib/compress/zstd_compress_sequences.c
src/submodules/zstd/lib/compress/zstd_compress_superblock.c
src/submodules/zstd/lib/compress/zstd_double_fast.c
src/submodules/zstd/lib/compress/zstd_fast.c
src/submodules/zstd/lib/compress/zstd_lazy.c
src/submodules/zstd/lib/compress/zstd_ldm.c
src/submodules/zstd/lib/compress/zstd_opt.c
)

set(CLP_FFI_JS_SUPPORTED_ENVIRONMENTS
Expand Down Expand Up @@ -183,6 +203,7 @@ Link options: ${CLP_FFI_JS_LINK_OPTIONS}."
src/submodules/clp/components/core/src/clp
src/submodules/clp/components/core/submodules
src/submodules/fmt/include
src/submodules/msgpack/include
src/submodules/spdlog/include
src/submodules/zstd/lib
)
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@
"import": "./dist/ClpFfiJs-worker.js",
"types": "./dist/ClpFfiJs-worker.d.ts"
}
},
"devDependencies": {
"msgpackr": "^1.11.2"
}
}
28 changes: 28 additions & 0 deletions src/clp_ffi_js/ir/StreamWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include <emscripten/bind.h>
#include <emscripten/val.h>

#include <clp_ffi_js/ir/StreamWriter.hpp>
#include <clp_ffi_js/ir/StructuredIrStreamWriter.hpp>

namespace {
EMSCRIPTEN_BINDINGS(ClpStreamWriter) {
// JS types used as inputs
emscripten::register_type<clp_ffi_js::ir::WriterOptions>("{compressionLevel: number | undefined}");

// JS types used as outputs

emscripten::class_<clp_ffi_js::ir::StreamWriter>("StreamWriter")
.constructor(
&clp_ffi_js::ir::StreamWriter::create,
emscripten::return_value_policy::take_ownership()
)
.function("write", &clp_ffi_js::ir::StreamWriter::write)
.function("close", &clp_ffi_js::ir::StreamWriter::close);
}
} // namespace

namespace clp_ffi_js::ir {
auto StreamWriter::create(emscripten::val const& stream, WriterOptions const& writer_options) -> std::unique_ptr<StreamWriter> {
return std::make_unique<StructuredIrStreamWriter>(stream, writer_options);
}
} // namespace clp_ffi_js::ir
62 changes: 62 additions & 0 deletions src/clp_ffi_js/ir/StreamWriter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#ifndef CLP_FFI_JS_IR_STREAMWRITER_HPP
#define CLP_FFI_JS_IR_STREAMWRITER_HPP

#include <clp/ffi/ir_stream/Serializer.hpp>
#include <clp/ir/types.hpp>
#include <emscripten/val.h>

namespace clp_ffi_js::ir {
// JS types used as inputs
EMSCRIPTEN_DECLARE_VAL_TYPE(WriterOptions);

class StreamWriter {
public:
using ClpIrSerializer = clp::ffi::ir_stream::Serializer<clp::ir::four_byte_encoded_variable_t>;
using BufferView = ClpIrSerializer::BufferView;

/**
* Creates a `StreamWriter` to write to a give string.
*
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
[[nodiscard]] static auto
create(emscripten::val const& stream, WriterOptions const& writer_options)
-> std::unique_ptr<StreamWriter>;

// Delete copy & move constructors and assignment operators
StreamWriter(StreamWriter const&) = delete;
StreamWriter(StreamWriter&&) = delete;
auto operator=(StreamWriter const&) -> StreamWriter& = delete;
auto operator=(StreamWriter&&) -> StreamWriter& = delete;

// Destructor
virtual ~StreamWriter() = default;

/**
* FIXME: consider separation.
* Writes a passed chunk of data to a WritableStream and its underlying sink.
*/
virtual auto write(emscripten::val chunk) -> void = 0;

/**
* FIXME: look into integrating this with `WritableStreamDefaultWriter.ready`
* Flushes the underlying IR buffer and `m_output_stream`.
*/
virtual auto flush() -> void = 0;

/**
* Closes the serializer by writing the buffered results into the output
* stream with end-of-stream IR Unit appended in the end.
* @return true on success.
* @return false on failure.
*/
virtual auto close() -> void = 0;

protected:
// TODO: add docs
StreamWriter() = default;
};
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_IR_STREAMWRITER_HPP
143 changes: 143 additions & 0 deletions src/clp_ffi_js/ir/StructuredIrStreamWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#include <msgpack.hpp>
#include <streaming_compression/zstd/Compressor.hpp>

#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/ir/StreamWriter.hpp>
#include <clp_ffi_js/ir/StructuredIrStreamWriter.hpp>

namespace clp_ffi_js::ir {
namespace {
constexpr std::string_view cWriterOptionsCompressionLevel{"compressionLevel"};

class WebStreamWriter final: public clp::WriterInterface {
public:
// Delete default constructor to disable direct instantiation.
WebStreamWriter() = delete;

explicit WebStreamWriter(emscripten::val stream)
: WriterInterface{},
m_writer{stream.call<emscripten::val>("getWriter")} {}

// Delete copy & move constructors and assignment operators
WebStreamWriter(WebStreamWriter const&) = delete;
WebStreamWriter(WebStreamWriter&&) = delete;
auto operator=(WebStreamWriter const&) -> WebStreamWriter& = delete;
auto operator=(WebStreamWriter&&) -> WebStreamWriter& = delete;

// Destructor
~WebStreamWriter() override = default;

void write(char const* data, size_t data_length) override {
auto const uint8Array{emscripten::val::global("Uint8Array").new_(data_length)};
emscripten::val memoryView{emscripten::typed_memory_view(data_length, data)};

uint8Array.call<void>("set", memoryView);
m_writer.call<void>("write", uint8Array);
}

void flush() override { return; }

clp::ErrorCode try_seek_from_begin(size_t pos) override { return clp::ErrorCode_Unsupported; }

clp::ErrorCode try_seek_from_current(off_t offset) override {
return clp::ErrorCode_Unsupported;
}

clp::ErrorCode try_get_pos(size_t& pos) const override { return clp::ErrorCode_Unsupported; }

private:
emscripten::val m_writer;
};
} // namespace

StructuredIrStreamWriter::StructuredIrStreamWriter(
emscripten::val const& stream,
WriterOptions const& writer_options
)
: StreamWriter{},
m_output_writer{std::make_unique<WebStreamWriter>(stream)} {
int compression_level{clp::streaming_compression::zstd::cDefaultCompressionLevel};
if (writer_options.hasOwnProperty(cWriterOptionsCompressionLevel.data())) {
compression_level = writer_options[cWriterOptionsCompressionLevel.data()].as<int>();
}

m_msgpack_buf.reserve(cDefaultMsgpackBufferSizeLimit);

m_writer = std::make_unique<clp::streaming_compression::zstd::Compressor>();
m_writer->open(
*m_output_writer,
compression_level
);

auto serializer_result{ClpIrSerializer::create()};
if (serializer_result.has_error()) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format(
"Failed to create serializer: {} {}",
serializer_result.error().category().name(),
serializer_result.error().message()
)
};
}
m_serializer = std::make_unique<ClpIrSerializer>(std::move(serializer_result.value()));
}

auto StructuredIrStreamWriter::write(emscripten::val chunk) -> void {
size_t const packed_user_gen_handle_length = chunk["length"].as<int>();
m_msgpack_buf.resize(packed_user_gen_handle_length);
const emscripten::val memoryView{
emscripten::typed_memory_view(packed_user_gen_handle_length, m_msgpack_buf.data())
};
memoryView.call<void>("set", chunk);

auto const unpacked_user_gen_handle{msgpack::unpack(
reinterpret_cast<char const*>(m_msgpack_buf.data()),
m_msgpack_buf.size()
)};
auto const unpacked_user_gen_map{unpacked_user_gen_handle.get().via.map};
m_msgpack_buf.clear();

// FIXME: this should come from the arg 'chunk' as well
msgpack::object_map auto_gen_map{0, nullptr};

auto const serializer_result{
m_serializer->serialize_msgpack_map(auto_gen_map, unpacked_user_gen_map)
};
if (false == serializer_result) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format("Failed to serialize msgpack map")
};
}

if (cDefaultIrBufferSizeLimit < get_ir_buf_size()) {
write_ir_buf_to_output_stream();
}
}

auto StructuredIrStreamWriter::flush() -> void {
write_ir_buf_to_output_stream();
m_writer->flush();
}

auto StructuredIrStreamWriter::close() -> void {
write_ir_buf_to_output_stream();
m_writer->close();

// FIXME: handle any read on this after close()
m_serializer.reset(nullptr);
}

auto StructuredIrStreamWriter::write_ir_buf_to_output_stream() const -> void {
auto const ir_buf_view{m_serializer->get_ir_buf_view()};
m_writer->write(reinterpret_cast<char const*>(ir_buf_view.data()), ir_buf_view.size());
m_serializer->clear_ir_buf();
}
} // namespace clp_ffi_js::ir
55 changes: 55 additions & 0 deletions src/clp_ffi_js/ir/StructuredIrStreamWriter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#ifndef CLP_FFI_JS_IR_STRUCTUREDIRSTREAMWRITER_HPP
#define CLP_FFI_JS_IR_STRUCTUREDIRSTREAMWRITER_HPP

#include <clp/streaming_compression/zstd/Compressor.hpp>
#include <clp/WriterInterface.hpp>

#include <clp_ffi_js/ir/StreamWriter.hpp>

namespace clp_ffi_js::ir {
class StructuredIrStreamWriter : public StreamWriter {
public:
/**
* The default Msgpack buffer size limit.
*/
static constexpr size_t cDefaultMsgpackBufferSizeLimit{4096};

/**
* The default IR buffer size limit.
*/
static constexpr size_t cDefaultIrBufferSizeLimit{65'536};

// Delete default constructor to disable direct instantiation.
StructuredIrStreamWriter() = delete;

// Delete copy & move constructors and assignment operators
StructuredIrStreamWriter(StructuredIrStreamWriter const&) = delete;
StructuredIrStreamWriter(StructuredIrStreamWriter&&) = delete;
auto operator=(StructuredIrStreamWriter const&) -> StructuredIrStreamWriter& = delete;
auto operator=(StructuredIrStreamWriter&&) -> StructuredIrStreamWriter& = delete;

// Destructor
~StructuredIrStreamWriter() override = default;

StructuredIrStreamWriter(emscripten::val const& stream, WriterOptions const& writer_options);
auto write(::emscripten::val chunk) -> void override;
auto flush() -> void override;
auto close() -> void override;

private:
auto write_ir_buf_to_output_stream() const -> void;

[[nodiscard]] auto get_ir_buf_size() const -> size_t {
return m_serializer->get_ir_buf_view().size();
}

// Variables
std::unique_ptr<clp::WriterInterface> m_output_writer;
std::unique_ptr<clp::streaming_compression::zstd::Compressor> m_writer;
std::unique_ptr<ClpIrSerializer> m_serializer;

std::vector<u_int8_t> m_msgpack_buf;
};
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_IR_STRUCTUREDIRSTREAMWRITER_HPP
1 change: 1 addition & 0 deletions src/submodules/msgpack
Submodule msgpack added at 919908
Loading
Loading