Skip to content

[realppl 6] offline ppl evaluation and tests #14852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: wuandy/RealPpl_5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 189 additions & 3 deletions Firestore/Example/Firestore.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Firestore/Source/API/FIRPipelineBridge+Internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ NS_ASSUME_NONNULL_BEGIN

@interface FIROrderingBridge (Internal)

- (std::shared_ptr<api::Ordering>)cppOrderingWithReader:(FSTUserDataReader *)reader;
- (api::Ordering)cppOrderingWithReader:(FSTUserDataReader *)reader;

@end

Expand Down
10 changes: 5 additions & 5 deletions Firestore/Source/API/FIRPipelineBridge.mm
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ - (nonnull id)initWithName:(NSString *)name Args:(nonnull NSArray<FIRExprBridge
@end

@implementation FIROrderingBridge {
std::shared_ptr<Ordering> cpp_ordering;
std::unique_ptr<Ordering> cpp_ordering;
NSString *_direction;
FIRExprBridge *_expr;
Boolean isUserDataRead;
Expand All @@ -197,14 +197,14 @@ - (nonnull id)initWithExpr:(FIRExprBridge *)expr Direction:(NSString *)direction
return self;
}

- (std::shared_ptr<Ordering>)cppOrderingWithReader:(FSTUserDataReader *)reader {
- (Ordering)cppOrderingWithReader:(FSTUserDataReader *)reader {
if (!isUserDataRead) {
cpp_ordering = std::make_shared<Ordering>(
cpp_ordering = std::make_unique<Ordering>(
[_expr cppExprWithReader:reader], Ordering::DirectionFromString(MakeString(_direction)));
}

isUserDataRead = YES;
return cpp_ordering;
return *cpp_ordering;
}

@end
Expand Down Expand Up @@ -610,7 +610,7 @@ - (id)initWithOrderings:(NSArray<id> *)orderings {

- (std::shared_ptr<api::Stage>)cppStageWithReader:(FSTUserDataReader *)reader {
if (!isUserDataRead) {
std::vector<std::shared_ptr<Ordering>> cpp_orderings;
std::vector<Ordering> cpp_orderings;
for (FIROrderingBridge *ordering in _orderings) {
cpp_orderings.push_back([ordering cppOrderingWithReader:reader]);
}
Expand Down
1 change: 1 addition & 0 deletions Firestore/core/src/api/expressions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace api {

Field::Field(std::string name) {
field_path_ = model::FieldPath::FromDotSeparatedString(name);
alias_ = field_path_.CanonicalString();
}

google_firestore_v1_Value Field::to_proto() const {
Expand Down
8 changes: 8 additions & 0 deletions Firestore/core/src/api/ordering.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ class Ordering {
: expr_(expr), direction_(direction) {
}

const Expr* expr() const {
return expr_.get();
}

Direction direction() const {
return direction_;
}

google_firestore_v1_Value to_proto() const;

private:
Expand Down
10 changes: 10 additions & 0 deletions Firestore/core/src/api/realtime_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ const std::vector<std::shared_ptr<EvaluableStage>>& RealtimePipeline::stages()
return this->stages_;
}

const std::vector<std::shared_ptr<EvaluableStage>>&
RealtimePipeline::rewritten_stages() const {
return this->rewritten_stages_;
}

void RealtimePipeline::SetRewrittentStages(
std::vector<std::shared_ptr<EvaluableStage>> stages) {
this->rewritten_stages_ = std::move(stages);
}

EvaluateContext RealtimePipeline::evaluate_context() {
return EvaluateContext(&serializer_);
}
Expand Down
4 changes: 4 additions & 0 deletions Firestore/core/src/api/realtime_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ class RealtimePipeline {
RealtimePipeline AddingStage(std::shared_ptr<EvaluableStage> stage);

const std::vector<std::shared_ptr<EvaluableStage>>& stages() const;
const std::vector<std::shared_ptr<EvaluableStage>>& rewritten_stages() const;

void SetRewrittentStages(std::vector<std::shared_ptr<EvaluableStage>>);

EvaluateContext evaluate_context();

private:
std::vector<std::shared_ptr<EvaluableStage>> stages_;
std::vector<std::shared_ptr<EvaluableStage>> rewritten_stages_;
remote::Serializer serializer_;
};

Expand Down
58 changes: 55 additions & 3 deletions Firestore/core/src/api/stages.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ namespace firebase {
namespace firestore {
namespace api {

CollectionSource::CollectionSource(std::string path)
: path_(model::ResourcePath::FromStringView(path)) {
}

google_firestore_v1_Pipeline_Stage CollectionSource::to_proto() const {
google_firestore_v1_Pipeline_Stage result;

Expand All @@ -50,7 +54,9 @@ google_firestore_v1_Pipeline_Stage CollectionSource::to_proto() const {
result.args = nanopb::MakeArray<google_firestore_v1_Value>(1);
result.args[0].which_value_type =
google_firestore_v1_Value_reference_value_tag;
result.args[0].reference_value = nanopb::MakeBytesArray(this->path_);
// TODO: use EncodeResourceName instead
result.args[0].reference_value =
nanopb::MakeBytesArray(this->path_.CanonicalString());

result.options_count = 0;
result.options = nullptr;
Expand Down Expand Up @@ -275,7 +281,7 @@ google_firestore_v1_Pipeline_Stage SortStage::to_proto() const {
result.args = nanopb::MakeArray<google_firestore_v1_Value>(result.args_count);

for (size_t i = 0; i < orders_.size(); ++i) {
result.args[i] = orders_[i]->to_proto();
result.args[i] = orders_[i].to_proto();
}

result.options_count = 0;
Expand Down Expand Up @@ -443,7 +449,20 @@ model::PipelineInputOutputVector CollectionSource::Evaluate(
std::copy_if(inputs.begin(), inputs.end(), std::back_inserter(results),
[this](const model::MutableDocument& doc) {
return doc.is_found_document() &&
doc.key().path().PopLast().CanonicalString() == path_;
doc.key().path().PopLast().CanonicalString() ==
path_.CanonicalString();
});
return results;
}

model::PipelineInputOutputVector CollectionGroupSource::Evaluate(
const EvaluateContext& /*context*/,
const model::PipelineInputOutputVector& inputs) const {
model::PipelineInputOutputVector results;
std::copy_if(inputs.begin(), inputs.end(), std::back_inserter(results),
[this](const model::MutableDocument& doc) {
return doc.is_found_document() &&
doc.key().GetCollectionGroup() == collection_id_;
});
return results;
}
Expand Down Expand Up @@ -492,6 +511,39 @@ model::PipelineInputOutputVector LimitStage::Evaluate(
inputs.begin() + count);
}

model::PipelineInputOutputVector SortStage::Evaluate(
const EvaluateContext& context,
const model::PipelineInputOutputVector& inputs) const {
model::PipelineInputOutputVector input_copy = inputs;
std::sort(
input_copy.begin(), input_copy.end(),
[this, &context](const model::PipelineInputOutput& left,
const model::PipelineInputOutput& right) -> bool {
for (const auto& ordering : this->orders_) {
const auto left_result =
ordering.expr()->ToEvaluable()->Evaluate(context, left);
const auto right_result =
ordering.expr()->ToEvaluable()->Evaluate(context, right);

auto left_val = left_result.IsErrorOrUnset() ? model::MinValue()
: *left_result.value();
auto right_val = right_result.IsErrorOrUnset()
? model::MinValue()
: *right_result.value();
const auto compare_result = model::Compare(left_val, right_val);
if (compare_result != util::ComparisonResult::Same) {
return ordering.direction() == Ordering::ASCENDING
? compare_result == util::ComparisonResult::Ascending
: compare_result == util::ComparisonResult::Descending;
}
}

return false;
});

return input_copy;
}

} // namespace api
} // namespace firestore
} // namespace firebase
58 changes: 51 additions & 7 deletions Firestore/core/src/api/stages.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "Firestore/core/src/api/expressions.h"
#include "Firestore/core/src/api/ordering.h"
#include "Firestore/core/src/model/model_fwd.h"
#include "Firestore/core/src/model/resource_path.h"
#include "Firestore/core/src/nanopb/message.h"
#include "absl/types/optional.h"

Expand Down Expand Up @@ -67,25 +68,29 @@ class EvaluableStage : public Stage {
EvaluableStage() = default;
virtual ~EvaluableStage() = default;

virtual absl::string_view name() const = 0;
virtual model::PipelineInputOutputVector Evaluate(
const EvaluateContext& context,
const model::PipelineInputOutputVector& inputs) const = 0;
};

class CollectionSource : public EvaluableStage {
public:
explicit CollectionSource(std::string path) : path_(std::move(path)) {
}
explicit CollectionSource(std::string path);
~CollectionSource() override = default;

google_firestore_v1_Pipeline_Stage to_proto() const override;

absl::string_view name() const override {
return "collection";
}

model::PipelineInputOutputVector Evaluate(
const EvaluateContext& context,
const model::PipelineInputOutputVector& inputs) const override;

private:
std::string path_;
model::ResourcePath path_;
};

class DatabaseSource : public EvaluableStage {
Expand All @@ -94,12 +99,17 @@ class DatabaseSource : public EvaluableStage {
~DatabaseSource() override = default;

google_firestore_v1_Pipeline_Stage to_proto() const override;

absl::string_view name() const override {
return "database";
}

model::PipelineInputOutputVector Evaluate(
const EvaluateContext& context,
const model::PipelineInputOutputVector& inputs) const override;
};

class CollectionGroupSource : public Stage {
class CollectionGroupSource : public EvaluableStage {
public:
explicit CollectionGroupSource(std::string collection_id)
: collection_id_(std::move(collection_id)) {
Expand All @@ -108,6 +118,14 @@ class CollectionGroupSource : public Stage {

google_firestore_v1_Pipeline_Stage to_proto() const override;

absl::string_view name() const override {
return "collection_group";
}

model::PipelineInputOutputVector Evaluate(
const EvaluateContext& context,
const model::PipelineInputOutputVector& inputs) const override;

private:
std::string collection_id_;
};
Expand All @@ -121,6 +139,10 @@ class DocumentsSource : public Stage {

google_firestore_v1_Pipeline_Stage to_proto() const override;

absl::string_view name() const {
return "documents";
}

private:
std::vector<std::string> documents_;
};
Expand Down Expand Up @@ -163,6 +185,11 @@ class Where : public EvaluableStage {
~Where() override = default;

google_firestore_v1_Pipeline_Stage to_proto() const override;

absl::string_view name() const override {
return "where";
}

model::PipelineInputOutputVector Evaluate(
const EvaluateContext& context,
const model::PipelineInputOutputVector& inputs) const override;
Expand Down Expand Up @@ -218,6 +245,11 @@ class LimitStage : public EvaluableStage {
~LimitStage() override = default;

google_firestore_v1_Pipeline_Stage to_proto() const override;

absl::string_view name() const override {
return "limit";
}

model::PipelineInputOutputVector Evaluate(
const EvaluateContext& context,
const model::PipelineInputOutputVector& inputs) const override;
Expand Down Expand Up @@ -252,17 +284,29 @@ class SelectStage : public Stage {
std::unordered_map<std::string, std::shared_ptr<Expr>> fields_;
};

class SortStage : public Stage {
class SortStage : public EvaluableStage {
public:
explicit SortStage(std::vector<std::shared_ptr<Ordering>> orders)
explicit SortStage(std::vector<Ordering> orders)
: orders_(std::move(orders)) {
}
~SortStage() override = default;

google_firestore_v1_Pipeline_Stage to_proto() const override;

absl::string_view name() const override {
return "sort";
}

model::PipelineInputOutputVector Evaluate(
const EvaluateContext& context,
const model::PipelineInputOutputVector& inputs) const override;

const std::vector<Ordering>& orders() const {
return orders_;
}

private:
std::vector<std::shared_ptr<Ordering>> orders_;
std::vector<Ordering> orders_;
};

class DistinctStage : public Stage {
Expand Down
2 changes: 0 additions & 2 deletions Firestore/core/src/core/expressions_eval.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@

#include "Firestore/core/src/api/expressions.h"
#include "Firestore/core/src/api/stages.h"
#include "Firestore/core/src/model/value_util.h"
#include "Firestore/core/src/nanopb/message.h"
#include "Firestore/core/src/util/hard_assert.h"
#include "absl/types/optional.h"

namespace firebase {
Expand Down
9 changes: 7 additions & 2 deletions Firestore/core/src/core/pipeline_run.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "Firestore/core/src/api/stages.h"
#include "Firestore/core/src/model/mutable_document.h"
#include "Firestore/core/src/remote/serializer.h"
#include "pipeline_util.h"

namespace firebase {
namespace firestore {
Expand All @@ -29,8 +30,12 @@ namespace core {
std::vector<model::MutableDocument> RunPipeline(
api::RealtimePipeline& pipeline,
const std::vector<model::MutableDocument>& inputs) {
auto& current = const_cast<std::vector<model::MutableDocument>&>(inputs);
for (const auto& stage : pipeline.stages()) {
if (pipeline.rewritten_stages().empty()) {
pipeline.SetRewrittentStages(RewriteStages(pipeline.stages()));
}

auto current = std::vector<model::MutableDocument>(inputs);
for (const auto& stage : pipeline.rewritten_stages()) {
current = stage->Evaluate(pipeline.evaluate_context(), current);
}

Expand Down
Loading
Loading