Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add StreamReader cancel read steam #2232

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/async_stream/RequestStreamExampleCtrl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class RequestStreamExampleCtrl : public HttpController<RequestStreamExampleCtrl>
[files](const char *data, size_t length) {
if (files->back().tmpName.empty())
{
return;
return false;
}
auto &currentFile = files->back().file;
if (length == 0)
Expand All @@ -109,18 +109,20 @@ class RequestStreamExampleCtrl : public HttpController<RequestStreamExampleCtrl>
currentFile.flush();
currentFile.close();
}
return;
return true;
}
LOG_INFO << "data[" << length << "]: ";
if (currentFile.is_open())
{
LOG_INFO << "write file";
currentFile.write(data, length);
return true;
}
else
{
LOG_ERROR << "file not open";
}
return false;
},
[files, callback = std::move(callback)](std::exception_ptr ex) {
if (ex)
Expand Down
26 changes: 20 additions & 6 deletions lib/src/MultipartStreamParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,23 +252,37 @@ void drogon::MultipartStreamParser::parse(
}
std::string_view v = buffer_.view();
auto pos = v.find(crlfDashBoundary_);

const auto callDataCB = [&dataCb, this](const char * _data, std::size_t _len) {
if(!dataCb(_data, _len)) {
isValid_ = false;
isFinished_ = true;
exception_type_ = ExceptionType::kServerCancel;
}
};

if (pos == std::string::npos)
{
// boundary not found, leave potential partial boundary
size_t len = v.size() - crlfDashBoundary_.size();
const size_t len = v.size() - crlfDashBoundary_.size();
if (len > 0)
{
dataCb(v.data(), len);
callDataCB(v.data(), len);
buffer_.eraseFront(len);
}
return;
}
// found boundary
dataCb(v.data(), pos);
if (pos > 0)
{
dataCb(v.data() + pos, 0); // notify end of file
if(!isFinished() && isValid()) {
callDataCB(v.data(), pos);

if (pos > 0)
{
// notify end of file
callDataCB(v.data() + pos, 0);
}
}

buffer_.eraseFront(pos + crlfDashBoundary_.size());
status_ = Status::kExpectEndOrNewEntry;
continue;
Expand Down
13 changes: 13 additions & 0 deletions lib/src/MultipartStreamParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ namespace drogon
{
class DROGON_EXPORT MultipartStreamParser
{
public:
enum class ExceptionType
{
kNoException = 0,
kServerCancel = 1,
};

public:
MultipartStreamParser(const std::string &contentType);

Expand All @@ -39,6 +46,10 @@ class DROGON_EXPORT MultipartStreamParser
return isValid_;
}

ExceptionType exceptionType() const {
return exception_type_;
}

private:
const std::string dash_ = "--";
const std::string crlf_ = "\r\n";
Expand Down Expand Up @@ -70,6 +81,8 @@ class DROGON_EXPORT MultipartStreamParser
kExpectEndOrNewEntry = 4,
} status_{Status::kExpectFirstBoundary};

ExceptionType exception_type_{ExceptionType::kNoException};

MultipartHeader currentHeader_;
bool isValid_{true};
bool isFinished_{false};
Expand Down
12 changes: 10 additions & 2 deletions lib/src/RequestStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,17 @@ class MultipartStreamReader : public RequestStreamReader
parser_.parse(data, length, headerCb_, dataCb_);
if (!parser_.isValid())
{
std::exception_ptr exception = nullptr;
// TODO: should we mix stream error and user error?
finishCb_(std::make_exception_ptr(
std::runtime_error("invalid multipart data")));
switch (parser_.exceptionType()) {
case MultipartStreamParser::ExceptionType::kServerCancel:
exception = std::make_exception_ptr(std::runtime_error("server cancelled"));
break;
default:
exception = std::make_exception_ptr(std::runtime_error("invalid multipart data"));
break;
}
finishCb_(exception);
}
else if (parser_.isFinished())
{
Expand Down
1 change: 1 addition & 0 deletions lib/tests/integration_test/server/RequestStreamTestCtrl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class RequestStreamTestCtrl : public HttpController<RequestStreamTestCtrl>
{
ctx->firstFileContent.append(data, length);
}
return true;
},
[ctx, callback = std::move(callback)](std::exception_ptr ex) {
auto resp = HttpResponse::newHttpResponse();
Expand Down
Loading