Skip to content

Commit cc225b4

Browse files
authored
Merge pull request #79 from SpringMT/feature/unlock-gvl
Feature/unlock gvl for streaming compression/decompression
2 parents 788f4f5 + 04d74fc commit cc225b4

13 files changed

+142
-58
lines changed

.github/workflows/ruby.yml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,20 @@
77

88
name: Ruby
99

10-
on: [push, pull_request]
10+
on:
11+
push:
12+
branches:
13+
- main
14+
paths-ignore:
15+
- 'README.md'
16+
pull_request:
17+
18+
concurrency:
19+
group: ${{ github.workflow }}-${{ github.ref }}
20+
cancel-in-progress: true
1121

1222
jobs:
1323
test:
14-
1524
runs-on: ubuntu-latest
1625
strategy:
1726
matrix:

benchmarks/multi_thread_comporess.rb renamed to benchmarks/multi_thread_streaming_comporess.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
require 'benchmark/ips'
21
$LOAD_PATH.unshift '../lib'
3-
require 'json'
4-
require 'objspace'
52
require 'zstd-ruby'
63
require 'thread'
74

@@ -19,7 +16,11 @@
1916
THREADS.times.map {
2017
Thread.new {
2118
while str = queue.pop
22-
Zstd.compress(str)
19+
stream = Zstd::StreamingCompress.new
20+
stream << str
21+
res = stream.flush
22+
stream << str
23+
res << stream.finish
2324
end
2425
}
2526
}.each(&:join)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
$LOAD_PATH.unshift '../lib'
2+
require 'zstd-ruby'
3+
require 'thread'
4+
5+
GUESSES = (ENV['GUESSES'] || 1000).to_i
6+
THREADS = (ENV['THREADS'] || 1).to_i
7+
8+
p GUESSES: GUESSES, THREADS: THREADS
9+
10+
sample_file_name = ARGV[0]
11+
json_string = File.read("./samples/#{sample_file_name}")
12+
target = Zstd.compress(json_string)
13+
14+
queue = Queue.new
15+
GUESSES.times { queue << target }
16+
THREADS.times { queue << nil }
17+
THREADS.times.map {
18+
Thread.new {
19+
while str = queue.pop
20+
stream = Zstd::StreamingDecompress.new
21+
stream.decompress(str)
22+
stream.decompress(str)
23+
end
24+
}
25+
}.each(&:join)

benchmarks/zstd_compress_memory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
sample_file_name = ARGV[0]
1212

13-
json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true)
13+
json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true)
1414
json_string = json_data.to_json
1515

1616
i = 0

benchmarks/zstd_decompress_memory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
p "#{ObjectSpace.memsize_of_all/1000} #{ObjectSpace.count_objects} #{`ps -o rss= -p #{Process.pid}`.to_i}"
1010

1111
sample_file_name = ARGV[0]
12-
json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true)
12+
json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true)
1313
json_string = json_data.to_json
1414

1515
i = 0

benchmarks/zstd_streaming_compress_memory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
sample_file_name = ARGV[0]
1212

13-
json_string = IO.read("./samples/#{sample_file_name}")
13+
json_string = File.read("./samples/#{sample_file_name}")
1414

1515
i = 0
1616
start_time = Time.now

benchmarks/zstd_streaming_decompress_memory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
sample_file_name = ARGV[0]
1212

13-
cstr = IO.read("./results/#{sample_file_name}.zstd")
13+
cstr = File.read("./results/#{sample_file_name}.zstd")
1414
i = 0
1515
start_time = Time.now
1616
while true do

examples/sinatra/Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: ../..
33
specs:
4-
zstd-ruby (1.5.6.1)
4+
zstd-ruby (1.5.6.2)
55

66
GEM
77
remote: https://rubygems.org/

ext/zstdruby/common.h

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
#define ZSTD_RUBY_H 1
33

44
#include <ruby.h>
5+
#ifdef HAVE_RUBY_THREAD_H
6+
#include <ruby/thread.h>
7+
#endif
8+
#include <stdbool.h>
59
#include "./libzstd/zstd.h"
610

711
static int convert_compression_level(VALUE compression_level_value)
@@ -12,11 +16,6 @@ static int convert_compression_level(VALUE compression_level_value)
1216
return NUM2INT(compression_level_value);
1317
}
1418

15-
static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp)
16-
{
17-
return ZSTD_compressStream2(ctx, output, input, endOp);
18-
}
19-
2019
static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VALUE kwargs)
2120
{
2221
ID kwargs_keys[2];
@@ -45,6 +44,36 @@ static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VAL
4544
}
4645
}
4746

47+
struct compress_params {
48+
ZSTD_CCtx* ctx;
49+
ZSTD_outBuffer* output;
50+
ZSTD_inBuffer* input;
51+
ZSTD_EndDirective endOp;
52+
size_t ret;
53+
};
54+
55+
static void* compress_wrapper(void* args)
56+
{
57+
struct compress_params* params = args;
58+
params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp);
59+
return NULL;
60+
}
61+
62+
static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp, bool gvl)
63+
{
64+
#ifdef HAVE_RUBY_THREAD_H
65+
if (gvl) {
66+
return ZSTD_compressStream2(ctx, output, input, endOp);
67+
} else {
68+
struct compress_params params = { ctx, output, input, endOp };
69+
rb_thread_call_without_gvl(compress_wrapper, &params, NULL, NULL);
70+
return params.ret;
71+
}
72+
#else
73+
return ZSTD_compressStream2(ctx, output, input, endOp);
74+
#endif
75+
}
76+
4877
static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)
4978
{
5079
ID kwargs_keys[1];
@@ -63,4 +92,33 @@ static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)
6392
}
6493
}
6594

95+
struct decompress_params {
96+
ZSTD_DCtx* dctx;
97+
ZSTD_outBuffer* output;
98+
ZSTD_inBuffer* input;
99+
size_t ret;
100+
};
101+
102+
static void* decompress_wrapper(void* args)
103+
{
104+
struct decompress_params* params = args;
105+
params->ret = ZSTD_decompressStream(params->dctx, params->output, params->input);
106+
return NULL;
107+
}
108+
109+
static size_t zstd_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, bool gvl)
110+
{
111+
#ifdef HAVE_RUBY_THREAD_H
112+
if (gvl) {
113+
return ZSTD_decompressStream(dctx, output, input);
114+
} else {
115+
struct decompress_params params = { dctx, output, input };
116+
rb_thread_call_without_gvl(decompress_wrapper, &params, NULL, NULL);
117+
return params.ret;
118+
}
119+
#else
120+
return ZSTD_decompressStream(dctx, output, input);
121+
#endif
122+
}
123+
66124
#endif /* ZSTD_RUBY_H */

ext/zstdruby/extconf.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
have_func('rb_gc_mark_movable')
44

5-
$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY'
5+
$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY -DZSTD_MULTITHREAD -pthread -DDEBUGLEVEL=0'
66
$CPPFLAGS += " -fdeclspec" if CONFIG['CXX'] =~ /clang/
77

88
Dir.chdir File.expand_path('..', __FILE__) do

ext/zstdruby/streaming_compress.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
106106
do {
107107
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
108108

109-
size_t const ret = zstd_compress(sc->ctx, &output, &input, endOp);
109+
size_t const ret = zstd_compress(sc->ctx, &output, &input, endOp, false);
110110
if (ZSTD_isError(ret)) {
111111
rb_raise(rb_eRuntimeError, "flush error error code: %s", ZSTD_getErrorName(ret));
112112
}
@@ -130,7 +130,7 @@ rb_streaming_compress_compress(VALUE obj, VALUE src)
130130
VALUE result = rb_str_new(0, 0);
131131
while (input.pos < input.size) {
132132
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
133-
size_t const ret = zstd_compress(sc->ctx, &output, &input, ZSTD_e_continue);
133+
size_t const ret = zstd_compress(sc->ctx, &output, &input, ZSTD_e_continue, false);
134134
if (ZSTD_isError(ret)) {
135135
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
136136
}
@@ -157,7 +157,7 @@ rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
157157

158158
while (input.pos < input.size) {
159159
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
160-
size_t const ret = zstd_compress(sc->ctx, &output, &input, ZSTD_e_continue);
160+
size_t const ret = zstd_compress(sc->ctx, &output, &input, ZSTD_e_continue, false);
161161
if (ZSTD_isError(ret)) {
162162
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
163163
}

ext/zstdruby/streaming_decompress.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src)
104104
VALUE result = rb_str_new(0, 0);
105105
while (input.pos < input.size) {
106106
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
107-
size_t const ret = ZSTD_decompressStream(sd->dctx, &output, &input);
107+
size_t const ret = zstd_decompress(sd->dctx, &output, &input, false);
108108
if (ZSTD_isError(ret)) {
109109
rb_raise(rb_eRuntimeError, "decompress error error code: %s", ZSTD_getErrorName(ret));
110110
}

ext/zstdruby/zstdruby.c

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ static VALUE rb_compress(int argc, VALUE *argv, VALUE self)
2626
char* input_data = RSTRING_PTR(input_value);
2727
size_t input_size = RSTRING_LEN(input_value);
2828
ZSTD_inBuffer input = { input_data, input_size, 0 };
29+
// ZSTD_compressBound causes SEGV under multi-thread
2930
size_t max_compressed_size = ZSTD_compressBound(input_size);
3031
VALUE buf = rb_str_new(NULL, max_compressed_size);
3132
char* output_data = RSTRING_PTR(buf);
3233
ZSTD_outBuffer output = { (void*)output_data, max_compressed_size, 0 };
3334

34-
size_t const ret = zstd_compress(ctx, &output, &input, ZSTD_e_end);
35+
size_t const ret = zstd_compress(ctx, &output, &input, ZSTD_e_end, true);
3536
if (ZSTD_isError(ret)) {
3637
ZSTD_freeCCtx(ctx);
3738
rb_raise(rb_eRuntimeError, "%s: %s", "compress failed", ZSTD_getErrorName(ret));
@@ -87,19 +88,8 @@ static VALUE rb_compress_using_dict(int argc, VALUE *argv, VALUE self)
8788
}
8889

8990

90-
static VALUE decompress_buffered(const char* input_data, size_t input_size)
91+
static VALUE decompress_buffered(ZSTD_DCtx* dctx, const char* input_data, size_t input_size)
9192
{
92-
ZSTD_DStream* const dstream = ZSTD_createDStream();
93-
if (dstream == NULL) {
94-
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDStream failed");
95-
}
96-
97-
size_t initResult = ZSTD_initDStream(dstream);
98-
if (ZSTD_isError(initResult)) {
99-
ZSTD_freeDStream(dstream);
100-
rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_initDStream failed", ZSTD_getErrorName(initResult));
101-
}
102-
10393
VALUE output_string = rb_str_new(NULL, 0);
10494
ZSTD_outBuffer output = { NULL, 0, 0 };
10595

@@ -109,15 +99,14 @@ static VALUE decompress_buffered(const char* input_data, size_t input_size)
10999
rb_str_resize(output_string, output.size);
110100
output.dst = RSTRING_PTR(output_string);
111101

112-
size_t readHint = ZSTD_decompressStream(dstream, &output, &input);
113-
if (ZSTD_isError(readHint)) {
114-
ZSTD_freeDStream(dstream);
115-
rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_decompressStream failed", ZSTD_getErrorName(readHint));
102+
size_t ret = zstd_decompress(dctx, &output, &input, true);
103+
if (ZSTD_isError(ret)) {
104+
ZSTD_freeDCtx(dctx);
105+
rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_decompressStream failed", ZSTD_getErrorName(ret));
116106
}
117107
}
118-
119-
ZSTD_freeDStream(dstream);
120108
rb_str_resize(output_string, output.pos);
109+
ZSTD_freeDCtx(dctx);
121110
return output_string;
122111
}
123112

@@ -129,6 +118,11 @@ static VALUE rb_decompress(int argc, VALUE *argv, VALUE self)
129118
StringValue(input_value);
130119
char* input_data = RSTRING_PTR(input_value);
131120
size_t input_size = RSTRING_LEN(input_value);
121+
ZSTD_DCtx* const dctx = ZSTD_createDCtx();
122+
if (dctx == NULL) {
123+
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDCtx failed");
124+
}
125+
set_decompress_params(dctx, kwargs);
132126

133127
unsigned long long const uncompressed_size = ZSTD_getFrameContentSize(input_data, input_size);
134128
if (uncompressed_size == ZSTD_CONTENTSIZE_ERROR) {
@@ -137,15 +131,9 @@ static VALUE rb_decompress(int argc, VALUE *argv, VALUE self)
137131
// ZSTD_decompressStream may be called multiple times when ZSTD_CONTENTSIZE_UNKNOWN, causing slowness.
138132
// Therefore, we will not standardize on ZSTD_decompressStream
139133
if (uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
140-
return decompress_buffered(input_data, input_size);
134+
return decompress_buffered(dctx, input_data, input_size);
141135
}
142136

143-
ZSTD_DCtx* const dctx = ZSTD_createDCtx();
144-
if (dctx == NULL) {
145-
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDCtx failed");
146-
}
147-
set_decompress_params(dctx, kwargs);
148-
149137
VALUE output = rb_str_new(NULL, uncompressed_size);
150138
char* output_data = RSTRING_PTR(output);
151139

@@ -167,35 +155,38 @@ static VALUE rb_decompress_using_dict(int argc, VALUE *argv, VALUE self)
167155
StringValue(input_value);
168156
char* input_data = RSTRING_PTR(input_value);
169157
size_t input_size = RSTRING_LEN(input_value);
170-
unsigned long long const uncompressed_size = ZSTD_getFrameContentSize(input_data, input_size);
171-
if (uncompressed_size == ZSTD_CONTENTSIZE_ERROR) {
172-
rb_raise(rb_eRuntimeError, "%s: %s", "not compressed by zstd", ZSTD_getErrorName(uncompressed_size));
173-
}
174-
if (uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
175-
return decompress_buffered(input_data, input_size);
176-
}
177-
VALUE output = rb_str_new(NULL, uncompressed_size);
178-
char* output_data = RSTRING_PTR(output);
179158

180159
char* dict_buffer = RSTRING_PTR(dict);
181160
size_t dict_size = RSTRING_LEN(dict);
182161
ZSTD_DDict* const ddict = ZSTD_createDDict(dict_buffer, dict_size);
183162
if (ddict == NULL) {
184163
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDDict failed");
185164
}
186-
187165
unsigned const expected_dict_id = ZSTD_getDictID_fromDDict(ddict);
188166
unsigned const actual_dict_id = ZSTD_getDictID_fromFrame(input_data, input_size);
189167
if (expected_dict_id != actual_dict_id) {
190168
ZSTD_freeDDict(ddict);
191-
rb_raise(rb_eRuntimeError, "%s: %s", "DictID mismatch", ZSTD_getErrorName(uncompressed_size));
169+
rb_raise(rb_eRuntimeError, "DictID mismatch");
192170
}
193171

194172
ZSTD_DCtx* const ctx = ZSTD_createDCtx();
195173
if (ctx == NULL) {
196174
ZSTD_freeDDict(ddict);
197175
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDCtx failed");
198176
}
177+
178+
unsigned long long const uncompressed_size = ZSTD_getFrameContentSize(input_data, input_size);
179+
if (uncompressed_size == ZSTD_CONTENTSIZE_ERROR) {
180+
ZSTD_freeDDict(ddict);
181+
ZSTD_freeDCtx(ctx);
182+
rb_raise(rb_eRuntimeError, "%s: %s", "not compressed by zstd", ZSTD_getErrorName(uncompressed_size));
183+
}
184+
if (uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
185+
return decompress_buffered(ctx, input_data, input_size);
186+
}
187+
188+
VALUE output = rb_str_new(NULL, uncompressed_size);
189+
char* output_data = RSTRING_PTR(output);
199190
size_t const decompress_size = ZSTD_decompress_usingDDict(ctx, output_data, uncompressed_size, input_data, input_size, ddict);
200191
if (ZSTD_isError(decompress_size)) {
201192
ZSTD_freeDDict(ddict);

0 commit comments

Comments
 (0)