Skip to content
57 changes: 57 additions & 0 deletions cpp/src/arrow/flight/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ set(ARROW_FLIGHT_SRCS
client_cookie_middleware.cc
client_tracing_middleware.cc
cookie_internal.cc
flight_data_decoder.cc
middleware.cc
serialization_internal.cc
server.cc
Expand Down Expand Up @@ -207,6 +208,21 @@ if(CMAKE_UNITY_BUILD AND WIN32)
PROPERTIES SKIP_UNITY_BUILD_INCLUSION TRUE)
endif()

# Suppress warnings from Abseil headers using deprecated <ciso646> in C++20.
# GCC 15+ with C++20 emits #warning which -Werror turns into an error.
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
set(ARROW_FLIGHT_CXX20_WARNING_FLAGS "-Wno-cpp")
set_source_files_properties(server_tracing_middleware.cc
client_tracing_middleware.cc
transport/grpc/grpc_client.cc
transport/grpc/grpc_server.cc
transport/grpc/serialization_internal.cc
transport/grpc/protocol_grpc_internal.cc
transport/grpc/util_internal.cc
PROPERTIES COMPILE_OPTIONS
"${ARROW_FLIGHT_CXX20_WARNING_FLAGS}")
endif()

if(ARROW_WITH_OPENTELEMETRY)
list(APPEND ARROW_FLIGHT_SRCS otel_logging.cc)
endif()
Expand Down Expand Up @@ -320,6 +336,13 @@ if(ARROW_TESTING)
foreach(LIB_TARGET ${ARROW_FLIGHT_TESTING_LIBRARIES})
target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_FLIGHT_EXPORTING)
endforeach()

# Suppress Abseil <ciso646> warnings in testing library (GCC 15+ with C++20)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
set_source_files_properties(test_auth_handlers.cc test_definitions.cc
test_flight_server.cc test_util.cc
PROPERTIES COMPILE_OPTIONS "-Wno-cpp")
endif()
endif()

add_arrow_test(flight_internals_test
Expand All @@ -334,11 +357,39 @@ add_arrow_test(flight_test
LABELS
"arrow_flight")

# PoC: Async Flight server using gRPC generic callback API
if(ARROW_BUILD_TESTS)
add_arrow_test(async_grpc_poc_test
SOURCES
transport/grpc/async_grpc_poc_test.cc
STATIC_LINK_LIBS
${ARROW_FLIGHT_TEST_LINK_LIBS}
LABELS
"arrow_flight")
endif()

# Suppress Abseil <ciso646> warnings in test files (GCC 15+ with C++20)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
if(TARGET arrow-flight-internals-test)
target_compile_options(arrow-flight-internals-test PRIVATE "-Wno-cpp")
endif()
if(TARGET arrow-flight-test)
target_compile_options(arrow-flight-test PRIVATE "-Wno-cpp")
endif()
if(TARGET arrow-async-grpc-poc-test)
target_compile_options(arrow-async-grpc-poc-test PRIVATE "-Wno-cpp")
endif()
endif()

# Build test server for unit tests or benchmarks
if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
add_executable(flight-test-server test_server.cc)
target_link_libraries(flight-test-server ${ARROW_FLIGHT_TEST_LINK_LIBS}
${GFLAGS_LIBRARIES})
# Suppress Abseil <ciso646> warnings (GCC 15+ with C++20)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
target_compile_options(flight-test-server PRIVATE "-Wno-cpp")
endif()

if(ARROW_BUILD_TESTS)
add_dependencies(arrow-flight-test flight-test-server)
Expand All @@ -365,6 +416,12 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(arrow-flight-benchmark ${ARROW_FLIGHT_TEST_LINK_LIBS}
${GFLAGS_LIBRARIES})

# Suppress Abseil <ciso646> warnings (GCC 15+ with C++20)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_STANDARD GREATER_EQUAL 20)
target_compile_options(arrow-flight-perf-server PRIVATE "-Wno-cpp")
target_compile_options(arrow-flight-benchmark PRIVATE "-Wno-cpp")
endif()

add_dependencies(arrow-flight-benchmark arrow-flight-perf-server)

add_dependencies(arrow_flight arrow-flight-benchmark)
Expand Down
135 changes: 135 additions & 0 deletions cpp/src/arrow/flight/flight_data_decoder.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/flight/flight_data_decoder.h"

#include "arrow/flight/serialization_internal.h"
#include "arrow/flight/transport.h"
#include "arrow/ipc/message.h"
#include "arrow/ipc/reader.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/status.h"

namespace arrow {
namespace flight {

namespace {

// FlightDataMessageReader is an ipc::MessageReader that accepts one at a time
// FlightData messages. Analogous to MessageReader::Open(InputStream*) but for
// individual FlightData messages directly read from the received buffers.
class FlightDataMessageReader : public ipc::MessageReader {
public:
void Push(internal::FlightData data) { data_ = std::move(data); }

::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
if (!data_.metadata) return nullptr;
return data_.OpenMessage();
}

std::shared_ptr<Buffer> ReadAppMetadata() { return data_.app_metadata; }

private:
internal::FlightData data_;
};

} // namespace

class FlightMessageDecoder::FlightMessageDecoderImpl {
public:
FlightMessageDecoderImpl(std::shared_ptr<FlightDataListener> listener,
ipc::IpcReadOptions options)
: listener_(std::move(listener)),
options_(std::move(options)),
message_reader_(new FlightDataMessageReader()) {}

Status Consume(std::shared_ptr<Buffer> buffer) {
ARROW_ASSIGN_OR_RAISE(auto data, internal::DeserializeFlightData(buffer));

if (!data.metadata) {
// Metadata-only message: no IPC content, just Flight app_metadata.
if (data.app_metadata && data.app_metadata->size() > 0) {
FlightStreamChunk chunk;
chunk.app_metadata = std::move(data.app_metadata);
RETURN_NOT_OK(listener_->OnNext(std::move(chunk)));
}
return Status::OK();
}

message_reader_->Push(std::move(data));

if (!batch_reader_) {
// Initialize RecordBatchStreamReader and read the first IPC message.
// It must be a schema.
// RecordBatchStreamReader requiring unique_ptr is slightly awkward
// since we want to keep a reference to the message reader.
ARROW_ASSIGN_OR_RAISE(
batch_reader_,
ipc::RecordBatchStreamReader::Open(
std::unique_ptr<ipc::MessageReader>(message_reader_), options_));
return listener_->OnSchemaDecoded(batch_reader_->schema());
}

std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(batch_reader_->ReadNext(&batch));
auto app_metadata = message_reader_->ReadAppMetadata();

if (batch) {
FlightStreamChunk chunk;
chunk.data = std::move(batch);
chunk.app_metadata = std::move(app_metadata);
return listener_->OnNext(std::move(chunk));
}
// This has to be a Dictionary batch.
// TODO: Add unit test validating assumption.
if (app_metadata && app_metadata->size() > 0) {
FlightStreamChunk chunk;
chunk.app_metadata = std::move(app_metadata);
return listener_->OnNext(std::move(chunk));
}
return Status::OK();
}

std::shared_ptr<Schema> schema() const {
return batch_reader_ ? batch_reader_->schema() : nullptr;
}

private:
std::shared_ptr<FlightDataListener> listener_;
ipc::IpcReadOptions options_;
// This is owned by the RecordBatchStreamReader once it's passed to it.
// We want to keep a reference to it so we can extract the app_metadata.
FlightDataMessageReader* message_reader_;
std::shared_ptr<ipc::RecordBatchStreamReader> batch_reader_;
};

FlightMessageDecoder::FlightMessageDecoder(std::shared_ptr<FlightDataListener> listener,
ipc::IpcReadOptions options)
: impl_(std::make_unique<FlightMessageDecoderImpl>(std::move(listener),
std::move(options))) {}

FlightMessageDecoder::~FlightMessageDecoder() = default;

Status FlightMessageDecoder::Consume(std::shared_ptr<Buffer> buffer) {
return impl_->Consume(std::move(buffer));
}

std::shared_ptr<Schema> FlightMessageDecoder::schema() const { return impl_->schema(); }

} // namespace flight
} // namespace arrow
77 changes: 77 additions & 0 deletions cpp/src/arrow/flight/flight_data_decoder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "arrow/flight/types.h"
#include "arrow/flight/visibility.h"
#include "arrow/ipc/options.h"
#include "arrow/ipc/reader.h"
#include "arrow/result.h"
#include "arrow/status.h"

namespace arrow {
namespace flight {

/// \brief A general listener class to receive events from FlightMessageDecoder
///
/// User must implement callback methods for interested events.
class ARROW_FLIGHT_EXPORT FlightDataListener : public ipc::Listener {
public:
/// \brief Called for each decoded FlightStreamChunk.
///
/// chunk.data is the decoded RecordBatch, or nullptr for metadata-only
/// messages.
virtual Status OnNext(FlightStreamChunk chunk) = 0;
};

/// \brief Push style stream decoder that turns raw arrow Buffers into
/// FlightStreamChunks.
///
/// This class decodes Apache Arrow Flight data format from arrow::Buffer
/// and fires events on the provided FlightDataListener.
class ARROW_FLIGHT_EXPORT FlightMessageDecoder {
public:
explicit FlightMessageDecoder(
std::shared_ptr<FlightDataListener> listener,
ipc::IpcReadOptions options = ipc::IpcReadOptions::Defaults());
~FlightMessageDecoder();

/// \brief Decode one FlightData message directly from abuffer.
///
/// Fires listener->OnSchemaDecoded() on the first message containing
/// a schema, listener->OnNext() for each subsequent record batch,
/// metadata-only message or dictionary batch.
///
/// \param[in] buffer a raw buffer directly from the transport. Example
/// the arrow::Buffer extracted from the grpc::ByteBuffer from the gRPC transport.
/// \return Status
Status Consume(std::shared_ptr<Buffer> buffer);

/// \brief The decoded schema.
///
/// Available after the first Consume() call that contains a schema message.
/// Returns nullptr if no schema has been decoded yet.
std::shared_ptr<Schema> schema() const;

private:
class FlightMessageDecoderImpl;
std::unique_ptr<FlightMessageDecoderImpl> impl_;
};

} // namespace flight
} // namespace arrow
Loading
Loading