Skip to content

Commit 54951a7

Browse files
author
Kevin Smith
committed
Landing new MapReduce infrastructure and key filtering code
commit 095cd34f7c33a9fa40acf14e5ddae91b11b06b0f Author: root <root@c3.(none)> Date: Thu Nov 18 16:55:25 2010 -0600 Added map/reduce key filter support to protobuf endpoint. Also fixed a bug in the map phase when keylister sends zero keys as a response batch. commit 1d2d43ba6ecd3afb992333fed664af2915b4b2b3 Author: root <root@c3.(none)> Date: Thu Nov 18 13:31:33 2010 -0600 Fix for multiple instances of same key, different args, for distinct key mapred commit 230a4f6aaaebe4c91a36773288a269fd90ce432a Author: root <root@c3.(none)> Date: Tue Nov 16 15:11:45 2010 -0600 bz://880 exit process with message on exhausted preflist. commit 0913097532b3322abf69c1d308400098488cd5a0 Author: Kevin Smith <[email protected]> Date: Tue Nov 16 13:22:48 2010 -0500 Fixed map input accounting error in riak_kv_mapper commit 5a565e1f52d250701780a8eefbf5dee6e6372051 Author: Kevin Smith <[email protected]> Date: Mon Nov 15 14:47:28 2010 -0500 Removing compiler warning commit 6e85e4ff6d8fbf00142f381ef6f34c57fae34477 Author: Kevin Smith <[email protected]> Date: Thu Nov 11 17:25:21 2010 -0500 Fix for bz basho#871 commit 53cdb17a35928173de5ed94d9231744551dccc7b Author: Kevin Smith <[email protected]> Date: Thu Nov 11 14:23:58 2010 -0500 Include key data when generating map cache keys commit c8fa8bf5cae30a6b596b2714406f8d90bab55831 Author: Kevin Smith <[email protected]> Date: Wed Nov 10 10:20:33 2010 -0500 Excising inno dependencies and fixing bitcask list keys bug in vnode commit 8ce8af4e1f396907e2d64a39f391284ec15f38db Author: Kevin Smith <[email protected]> Date: Wed Nov 10 09:44:57 2010 -0500 Updating vnode to work with bucket-aware list_keys for innostore commit 55d1b1af4d89d46f8a96815107e7971d61fc536b Author: Kevin Smith <[email protected]> Date: Tue Nov 9 21:38:08 2010 -0500 Fixed error where queued mappers were not always dequeued;Made JS VM allocation blocking in mapper commit 319e39f4436c734d2fa4f4d399993e27a01a8ede Merge: 42db485... 4c4561b... Author: Kevin Smith <[email protected]> Date: Tue Nov 9 20:34:17 2010 -0500 Merge branch 'master' of [email protected]:kevsmith/riak_kv-n1 commit 4c4561bc093e2a75da6a6d5ada771ee2463bdf72 Author: root <root@c3.(none)> Date: Tue Nov 9 19:32:21 2010 -0600 {error, notfound} fix commit 42db48523e487f2ec5e5dba6a08cc3002d7ef1c0 Author: Kevin Smith <[email protected]> Date: Tue Nov 9 09:54:48 2010 -0500 Removing dead code commit 53878edcb1e5d679ab7b5df012d864c49309d0ca Author: Kevin Smith <[email protected]> Date: Mon Nov 8 13:57:11 2010 -0500 Removing dead code commit 9a8405e4f54a092f3faa6d37749b683b22015b0b Merge: 568e056... 0093af4... Author: Kevin Smith <[email protected]> Date: Wed Nov 3 14:18:01 2010 -0700 Merge branch 'upstream' commit 568e056783c62dfc502f545170c6acd4fed9926d Merge: 6889ea4... feea65e... Author: Kevin Smith <[email protected]> Date: Wed Nov 3 11:31:46 2010 -0700 Merge branch 'upstream' commit 6889ea4122336f1c1cec1efae80fd45de0bbd982 Author: Kevin Smith <[email protected]> Date: Wed Nov 3 09:55:29 2010 -0700 Re-adding bitcask dep so fresh checkouts compile commit eb711eaa48659b8d51fe8fc7cb4da4c45e2ca300 Author: Kevin Smith <[email protected]> Date: Tue Nov 2 18:41:15 2010 -0700 Fixing repo link to forked innostore commit 19b0aec17cbdaaef2b94e932c6c24aae07f5bf26 Author: Kevin Smith <[email protected]> Date: Tue Nov 2 18:33:38 2010 -0700 Integrating streaming list keys for innostore commit 51ddb34d02809862ff2642284b81f23e4129634b Author: Kevin Smith <[email protected]> Date: Tue Nov 2 18:24:15 2010 -0700 Fixing keylisting to work with filters commit 39d586f7383fed4f409b99b02e0752d8f242d883 Merge: 47863eb... 189e9c3... Author: Kevin Smith <[email protected]> Date: Tue Nov 2 13:59:34 2010 -0700 Merge branch 'upstream' commit 47863eb93cb0a4f6c025c3d14e40c2c62dca19e6 Author: Kevin Smith <[email protected]> Date: Tue Nov 2 12:13:44 2010 -0700 Filter code clean up commit 0aab6882c41fba0f946b51d895301a534959801f Author: Kevin Smith <[email protected]> Date: Mon Nov 1 15:08:50 2010 -0700 Re-enabling discrete input chunking (again) commit 14e65b34e41af1cd6f9b55bd80377d0500bea6f8 Merge: 86eaea8... bee67f7... Author: Kevin Smith <[email protected]> Date: Mon Nov 1 14:55:48 2010 -0700 Merge branch 'develop' Conflicts: src/riak_client.erl commit bee67f75b76fa0c2bc57153e25065bcfaf717aa5 Author: Kevin Smith <[email protected]> Date: Mon Nov 1 11:33:11 2010 -0700 Fixed non-filter mapred queries commit 86eaea81817bbcaeefe40656c9fe6650dad671c6 Author: Kevin Smith <[email protected]> Date: Fri Oct 29 14:51:40 2010 -0700 Breaking discrete mapred inputs into batches of 100 commit 3a2dca9e277b83b3e8d86e5476e53f9cf8de3700 Merge: d1e7074... 753af60... Author: Kevin Smith <[email protected]> Date: Fri Oct 29 14:37:17 2010 -0700 Merge branch 'master' into develop commit 753af60b3482982887d7c49b58920cd4a34478dd Merge: a2e34d1... 64a7f68... Author: Kevin Smith <[email protected]> Date: Fri Oct 29 14:33:32 2010 -0700 Merge commit 'upstream/master' commit d1e707469d4b19247e6a40a9d7e4523cb9b82969 Merge: beccdce... a2e34d1... Author: Kevin Smith <[email protected]> Date: Thu Oct 28 09:33:52 2010 -0700 Merge branch 'master' into develop commit a2e34d122acb2e754b657b75b0e8a33319b63db1 Author: Kevin Smith <[email protected]> Date: Thu Oct 28 09:27:42 2010 -0700 Fixing mapper to only cache list map results commit beccdcebbc31f4a16a471200aca3dc063eed3d59 Author: root <root@c3.(none)> Date: Wed Oct 27 19:26:48 2010 -0500 added exact equality and associated test, fixed set_member to take binaries (and automatically convert them to strings if nec.) commit 5ff25a476a68672a09d0413bae9ea1cfda7c8175 Author: root <root@c3.(none)> Date: Wed Oct 27 17:34:31 2010 -0500 Added logical_and, logical_or, logical_not, greater_than_eq, less_than_eq, build_exprs, urldecode and associated eunit tests. commit 00769e09dd6a79e0e09c57cb4a465ba251387697 Author: Kevin Smith <[email protected]> Date: Wed Oct 27 11:19:36 2010 -0700 More filter wiring commit f045ee88565cbfb91fd5c4c99dce0c3b9f17e8f9 Author: root <root@c3.(none)> Date: Tue Oct 26 21:03:53 2010 -0500 Changed similarity to similar_to commit d11f5364f5abb385386b1ff3e3afea438221883d Author: Kevin Smith <[email protected]> Date: Tue Oct 26 18:36:04 2010 -0700 Making sure key filters propagate correctly commit dfa066f7f6b56e9f4f09086a846c3a4ad2ff3796 Merge: 42b0ccb... 8334e58... Author: Kevin Smith <[email protected]> Date: Tue Oct 26 17:59:41 2010 -0700 Merge branch 'develop' of [email protected]:kevsmith/riak_kv-n1 into develop commit 42b0ccb4d56c6c40dcf25ecac6db8e0f83255e27 Author: Kevin Smith <[email protected]> Date: Tue Oct 26 17:59:13 2010 -0700 Wiring up key selection logic commit 8334e582bb0f5e5ddc5c68239ce03d05cf601035 Author: root <root@c3.(none)> Date: Tue Oct 26 19:07:41 2010 -0500 A few more alternate eunit test cases commit 64fd99ab821120375151e452948c5ca9f6ac19fe Author: root <root@c3.(none)> Date: Tue Oct 26 19:04:32 2010 -0500 Added known false cases to eunit tests commit 8b3f75d7ac5d0acd8a62c87e4bb66bad82094020 Author: root <root@c3.(none)> Date: Tue Oct 26 18:40:31 2010 -0500 First pass at eunit tests. commit 23588603321270ddfb486824b52f2881f9164ccc Author: Kevin Smith <[email protected]> Date: Tue Oct 26 16:28:40 2010 -0700 Wiring up filter functions to key listing commit 5c82a1884f50a46b44186453e3bbf3d833a2e44f Author: root <root@c3.(none)> Date: Tue Oct 26 16:54:37 2010 -0500 added riak_kv_mapred_filters commit 01dba7fb76a2c5ded88b9e52a26d92655f924a8f Merge: 2258d04... 1307d08... Author: root <root@c3.(none)> Date: Tue Oct 26 16:54:14 2010 -0500 Hand merge Merge branch 'develop' of [email protected]:kevsmith/riak_kv-n1 into develop Conflicts: src/riak_kv_mapred_filters.erl commit 2258d04a54e96b9b493f4794f48982fc2acf65fa Author: root <root@c3.(none)> Date: Tue Oct 26 16:48:20 2010 -0500 string_to_float, float_to_string, starts_with, ends_with, inclusive/exclusive between optional parameter, removed substr (use matches) commit 1307d0850d34c318f313e912fd6df19790609c54 Author: Kevin Smith <[email protected]> Date: Tue Oct 26 14:38:32 2010 -0700 Wiring up key filters to JSON parsing commit 534059f2339594741a43923a098cf3242109f265 Author: root <root@c3.(none)> Date: Tue Oct 26 16:17:04 2010 -0500 unlimited number of transforms, tokenize, between commit 5efc9b3d9073496e2e5f9845441d75cc354866b7 Author: root <root@c3.(none)> Date: Tue Oct 26 16:00:06 2010 -0500 similarity, to_string, set_member commit 7d9468e6e75f5195f3b1a9cd81c53efa3f12631e Author: root <root@c3.(none)> Date: Tue Oct 26 15:40:53 2010 -0500 First pass, transforms & filters for riak_kv_mapred_filters (key and bucket filtering) commit ce4ef2097dcf9b507df5d9d96155679bb7f0fa29 Author: Kevin Smith <[email protected]> Date: Mon Oct 25 09:41:13 2010 -0700 Pointing erlang_js dep to the Jaegermonkey repo commit 7becb1f9dca92c5b85f21c45763c15ef648d90d7 Merge: 1b7421c... 0eda2a8... Author: Kevin Smith <[email protected]> Date: Mon Oct 25 09:16:53 2010 -0700 Merge branch 'master' of [email protected]:basho/riak_kv Conflicts: rebar.config commit 1b7421caf01503f0d48ad82af346c76b9f00b69e Author: John Muellerleile <[email protected]> Date: Fri Oct 22 13:10:46 2010 -0500 Fixes for new caching commit b49329b2c766912be3f7e4cc516956520e1471ae Author: John Muellerleile <[email protected]> Date: Fri Oct 22 00:20:38 2010 -0500 Recovery & retry code (untested w/ jager) commit e17d5789078f8a64a6f918371a41015f56606808 Author: Kevin Smith <[email protected]> Date: Thu Oct 21 14:32:45 2010 -0400 Fixing up deps and merge from tip commit 6c21df3bd009b3eba2ed8a8ace1487abbc1e5da0 Merge: 5329dd1... c305b94... Author: Kevin Smith <[email protected]> Date: Thu Oct 21 14:22:19 2010 -0400 Merging tip commit 5329dd1740877949bc3b995ae64cff4e3c3a431a Author: Kevin Smith <[email protected]> Date: Wed Oct 20 16:51:32 2010 -0400 Resetting visibility of ETS tables to public commit cd6b2b2dd7713e3c5f9e4df73bf63d6fcdcdb4a7 Author: Kevin Smith <[email protected]> Date: Wed Oct 20 16:44:02 2010 -0400 Reverting LRU sizing to #/entries vs. memory size. ETS memory accounting is not accurate enough to allow that level of control. commit 71164987fb3af6a402f81c526ed10e1225cb2905 Author: Kevin Smith <[email protected]> Date: Wed Oct 20 14:55:28 2010 -0400 Turning on caching for Erlang map functions commit 3208cf11fcfa690db2e47ffa664b87a4230995ea Author: Kevin Smith <[email protected]> Date: Wed Oct 20 14:40:45 2010 -0400 Adding caching to the JS MapReduce pipeline commit 54600396504f66eec8198f0476a47ac97c043048 Author: Kevin Smith <[email protected]> Date: Tue Oct 19 12:41:42 2010 -0400 Pushing JS dep to jaegermonkey repo commit 7b4153637871e5a383ab38568a9e2da5f1b36b92 Author: Kevin Smith <[email protected]> Date: Thu Oct 14 15:19:35 2010 -0400 Missed monitoring deferred mappers commit e454651af8794c6692addfffd75434efff3b1eab Author: Kevin Smith <[email protected]> Date: Mon Oct 11 18:42:23 2010 -0400 Uniquifying deferred mapper IDs across all cluster nodes commit a1e1289329d518d195baf5cace48bb21853b2975 Author: Kevin Smith <[email protected]> Date: Mon Oct 11 10:28:53 2010 -0400 Tweaking JS VM call timeouts to block commit 0029f83173176c8b728b9800e330371adf737c81 Merge: 87e6af0... dabbe69... Author: Kevin Smith <[email protected]> Date: Mon Oct 11 09:26:06 2010 -0400 Merging latest tip commit 87e6af0ed376ee8db819fb1440dae4cea7e57149 Merge: 4ea62ca... 4761088... Author: Kevin Smith <[email protected]> Date: Fri Oct 8 15:00:10 2010 -0400 Merging latest commit 4ea62cae16d89086dacd0022b84696fc459dbdfe Author: Kevin Smith <[email protected]> Date: Fri Oct 8 14:47:22 2010 -0400 Consolidating mapper_queue and map_master;Tweaking mappers to hold onto JS VM ref for entire mapper run commit 562ad4f56c2ac34c1b93ec1d61aae0b5adbbca0b Author: Kevin Smith <[email protected]> Date: Wed Oct 6 14:30:43 2010 -0400 Making queue mapped tasks obey max pool size commit 142e9b96ece1ada5d0dae4c03c2c4e5ea123e3ad Merge: 5c2a54b... 6d9aba5... Author: Kevin Smith <[email protected]> Date: Tue Oct 5 07:55:36 2010 -0400 Merging latest tip commit 5c2a54b8fc50b4cd12d977f7201346ec472d205d Author: John Muellerleile <[email protected]> Date: Mon Oct 4 15:23:58 2010 -0500 Fixed instances where disjoint key-preflist sets could cause empty keylists in the plan. commit df2b378af99ea63bb3cbde53546706319b3bb425 Merge: 86752bb... 05ed245... Author: Kevin Smith <[email protected]> Date: Mon Oct 4 13:31:36 2010 -0400 Merging latest from tip commit 86752bba168ba1d835049861ed9be14742e3489f Author: Kevin Smith <[email protected]> Date: Mon Oct 4 13:30:47 2010 -0400 Segregating JS usage into separate pools: one for hooks, one for map funs, and one for reduce funs commit 8685af2de4fee55e376d867d11d330492f211c8e Author: Kevin Smith <[email protected]> Date: Sat Oct 2 02:27:51 2010 -0400 Removed erroneous calls to erlang:exit/2 since mapper ids are no longer PIDs commit 5ff4c0d69380e324b7498ff32451d935c3488d5c Author: Kevin Smith <[email protected]> Date: Sat Oct 2 02:27:21 2010 -0400 Replaced file:(write|read)_file with file:(pwrite|read) commit a6b230359eeabf670e1003c0e1689973a41c083c Author: Kevin Smith <[email protected]> Date: Sat Oct 2 02:26:27 2010 -0400 Fixed process start order so mapper_queue can recover from disk on startup commit 1b419712603b68bc6939d3fb356b0b8e269652af Author: Kevin Smith <[email protected]> Date: Fri Oct 1 17:57:39 2010 -0400 First pass at deferring excessive map tasks to disk commit 89c86c9b8ef72b9aa7c047e4c3be600f1276a2bd Author: Kevin Smith <[email protected]> Date: Fri Oct 1 12:43:43 2010 -0400 I suck. commit 7483098bd911013bb9f2ca4965e66f93fd88472a Author: Kevin Smith <[email protected]> Date: Fri Oct 1 12:40:14 2010 -0400 Fixing compile error commit 0afba7191dd87e1a6add6c1f75bda1f1d505d64c Author: Kevin Smith <[email protected]> Date: Fri Oct 1 04:37:48 2010 -0400 Interim fix to handle the case of starting mappers with no inputs commit a860340ed3d23a240d345a027a533447a96aad3b Author: Kevin Smith <[email protected]> Date: Fri Oct 1 04:37:05 2010 -0400 Pulling in latest bitcask dep to pick up fixes commit 638256be7880d8600c1b69250af93b5d1d6e4976 Author: John Muellerleile <[email protected]> Date: Thu Sep 30 18:38:17 2010 -0500 Rewrote planner to use lists instead of digraph for performance. Reverted key list batch size to 100 to balance planning overhead with map performance. This will have to be revisited; map functions which do "heavy lifting" (vs. mapred_verify) will probably have greater benefit from bigger batches, though I haven't measured or done any real analysis yet. This rewrite of the planner was able to finish 500K keys in 44 seconds using 3 nodes using mapred_verify. commit 53b0966dac7db730843c2ec1a9a4a4d5cf2b3d07 Author: Kevin Smith <[email protected]> Date: Thu Sep 30 17:30:21 2010 -0400 Removed all map logic from vnode commit eaa6453c82d222876ed2f78663aad903dee487c8 Author: Kevin Smith <[email protected]> Date: Thu Sep 30 16:47:00 2010 -0400 Wiring up mapper start/stops to riak_kv_stats commit dbfe132c0e9d7661be1054efe09a29da68536075 Author: Kevin Smith <[email protected]> Date: Thu Sep 30 11:37:56 2010 -0400 Adding license headers commit a0d3ea58d18518d85fbda724772a3df238ef11fd Author: Kevin Smith <[email protected]> Date: Thu Sep 30 11:29:36 2010 -0400 Moving M/R out of vnode;Batching data retrieval for map evaluation commit 4d15e2d8446531e1c74f8b912679e98d9dbc5c76 Author: John Muellerleile <[email protected]> Date: Wed Sep 29 13:04:12 2010 -0500 Fix for planner (was not adding partition node, therefore no work being done), reduced streaming key batch size to 1000 commit e796ed9bf8e13e2b0a84efda6d8e459f8056fd7a Author: John Muellerleile <[email protected]> Date: Wed Sep 29 11:44:40 2010 -0500 Proof-of-concept batched "coarse grained" mapreduce processing Bug basho#760 commit 0f9383c1a7672fd91bb90e6a0932f324ae056242 Author: Kevin Smith <[email protected]> Date: Fri Sep 24 21:40:46 2010 -0400 Removing experimental map_phase batching logic commit 5f6f0aebc6278140238826a326390363159a69f4 Author: Kevin Smith <[email protected]> Date: Fri Sep 24 21:14:18 2010 -0400 Cleaning up compiler warnings commit b5fc3c7bcc20c2aa5ed84fabce6529985fadca28 Author: Kevin Smith <[email protected]> Date: Fri Sep 24 12:27:28 2010 -0400 Fixing bad dep commit ab552ff8a75bc8917c46310d3b330e7080e8b201 Author: Kevin Smith <[email protected]> Date: Thu Sep 23 16:05:30 2010 -0400 Adding missing file commit dffa9ee65728979929677a8018be524e24a039db Author: Kevin Smith <[email protected]> Date: Thu Sep 23 15:45:11 2010 -0400 Updating dep commit c258180ff11f636e0db55995958c97db869e24b2 Author: Kevin Smith <[email protected]> Date: Thu Sep 23 15:34:08 2010 -0400 Merging in some preliminary work related to vnode scheduling
1 parent 0093af4 commit 54951a7

29 files changed

+1965
-699
lines changed

ebin/riak_kv.app

+6-2
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,15 @@
3030
riak_kv_keys_fsm,
3131
riak_kv_legacy_vnode,
3232
riak_kv_lru,
33-
riak_kv_map_executor,
33+
riak_kv_map_master,
34+
riak_kv_mapper,
35+
riak_kv_mapper_sup,
3436
riak_kv_map_localphase,
3537
riak_kv_map_phase,
38+
riak_kv_mapred_cache,
39+
riak_kv_mapred_filters,
3640
riak_kv_mapred_json,
41+
riak_kv_mapred_planner,
3742
riak_kv_mapred_query,
3843
riak_kv_mapred_term,
3944
riak_kv_mapreduce,
@@ -65,7 +70,6 @@
6570
riak_core,
6671
luke,
6772
erlang_js,
68-
bitcask,
6973
mochiweb,
7074
webmachine,
7175
os_mon

include/riak_kv_map_phase.hrl

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-record(riak_kv_map_input, {bkey,
2+
bprops,
3+
kd,
4+
preflist}).

include/riak_kv_vnode.hrl

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111
bkey :: {binary(), binary()},
1212
req_id :: non_neg_integer()}).
1313

14+
-record(riak_kv_mget_req_v1, {
15+
bkeys :: list({binary(), binary()}),
16+
req_id :: non_neg_integer(),
17+
from :: term()}).
18+
1419
-record(riak_kv_listkeys_req_v1, {
1520
bucket :: binary(),
1621
req_id :: non_neg_integer()}).
@@ -36,6 +41,7 @@
3641

3742
-define(KV_PUT_REQ, #riak_kv_put_req_v1).
3843
-define(KV_GET_REQ, #riak_kv_get_req_v1).
44+
-define(KV_MGET_REQ, #riak_kv_mget_req_v1).
3945
-define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v2).
4046
-define(KV_DELETE_REQ, #riak_kv_delete_req_v1).
4147
-define(KV_MAP_REQ, #riak_kv_map_req_v1).

rebar.config

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@
99
"HEAD"}},
1010
{luke, "0.2.2", {git, "git://github.com/basho/luke",
1111
"HEAD"}},
12-
{erlang_js, "0.4.1", {git, "git://github.com/basho/erlang_js",
12+
{erlang_js, "0.5", {git, "git://github.com/kevsmith/erlang_js-jaegermonkey",
1313
"HEAD"}},
1414
{bitcask, "1.1.4", {git, "git://github.com/basho/bitcask",
1515
"HEAD"}},
1616
{ebloom, "1.0.2", {git, "git://github.com/basho/ebloom",
17-
"HEAD"}},
17+
"HEAD"}},
1818
{eper, "0.60", {git, "git://github.com/dizzyd/eper.git",
1919
"eper-0.60"}}
2020
]}.

src/riak_client.erl

+75-13
Original file line numberDiff line numberDiff line change
@@ -84,18 +84,37 @@ mapred(Inputs,Query,Timeout) ->
8484
%% {error, Err :: term()}
8585
%% @doc Perform a map/reduce job across the cluster.
8686
%% See the map/reduce documentation for explanation of behavior.
87-
mapred(Inputs,Query,ResultTransformer,Timeout)
88-
when is_binary(Inputs) ->
89-
mapred_bucket(Inputs, Query, ResultTransformer, Timeout);
87+
mapred(Inputs,Query,ResultTransformer,Timeout) when is_binary(Inputs) orelse
88+
is_tuple(Inputs) ->
89+
case is_binary(Inputs) orelse is_key_filter(Inputs) of
90+
true ->
91+
mapred_bucket(Inputs, Query, ResultTransformer, Timeout);
92+
false ->
93+
Me = self(),
94+
case mapred_stream(Query,Me,ResultTransformer,Timeout) of
95+
{ok, {ReqId, FlowPid}} ->
96+
case is_list(Inputs) of
97+
true ->
98+
add_inputs(FlowPid, Inputs);
99+
false ->
100+
mapred_dynamic_inputs_stream(FlowPid, Inputs, Timeout)
101+
end,
102+
luke_flow:finish_inputs(FlowPid),
103+
luke_flow:collect_output(ReqId, Timeout);
104+
Error ->
105+
Error
106+
end
107+
end;
90108
mapred(Inputs,Query,ResultTransformer,Timeout)
91109
when is_list(Query),
92110
(is_integer(Timeout) orelse Timeout =:= infinity) ->
93111
Me = self(),
94112
case mapred_stream(Query,Me,ResultTransformer,Timeout) of
95113
{ok, {ReqId, FlowPid}} ->
96-
if is_list(Inputs) ->
97-
luke_flow:add_inputs(FlowPid, Inputs);
98-
is_tuple(Inputs) ->
114+
case is_list(Inputs) of
115+
true ->
116+
add_inputs(FlowPid, Inputs);
117+
false ->
99118
mapred_dynamic_inputs_stream(FlowPid, Inputs, Timeout)
100119
end,
101120
luke_flow:finish_inputs(FlowPid),
@@ -186,7 +205,7 @@ mapred_bucket(Bucket, Query, ResultTransformer, Timeout, ErrorTolerance) ->
186205
%% review of Map/Reduce.
187206
mapred_dynamic_inputs_stream(FSMPid, InputDef, Timeout) ->
188207
case InputDef of
189-
{modfun, Mod, Fun, Options} ->
208+
{modfun, Mod, Fun, Options} ->
190209
Mod:Fun(FSMPid, Options, Timeout);
191210
_ ->
192211
throw({invalid_inputdef, InputDef})
@@ -224,7 +243,7 @@ get(Bucket, Key, R) -> get(Bucket, Key, R, ?DEFAULT_TIMEOUT).
224243
%% @doc Fetch the object at Bucket/Key. Return a value as soon as R
225244
%% nodes have responded with a value or error, or TimeoutMillisecs passes.
226245
get(Bucket, Key, R, Timeout) when is_binary(Bucket), is_binary(Key),
227-
(is_atom(R) or is_integer(R)),
246+
(is_atom(R) or is_integer(R)),
228247
is_integer(Timeout) ->
229248
Me = self(),
230249
ReqId = mk_reqid(),
@@ -237,7 +256,7 @@ get(Bucket, Key, R, Timeout) when is_binary(Bucket), is_binary(Key),
237256
%% {error, timeout} |
238257
%% {error, {n_val_violation, N::integer()}}
239258
%% @doc Store RObj in the cluster.
240-
%% Return as soon as the default W value number of nodes for this bucket
259+
%% Return as soon as the default W value number of nodes for this bucket
241260
%% nodes have received the request.
242261
%% @equiv put(RObj, W, W, default_timeout())
243262
put(RObj) -> put(RObj, default, default, ?DEFAULT_TIMEOUT).
@@ -387,11 +406,16 @@ stream_list_keys(Bucket, Timeout, ErrorTolerance, Client) ->
387406
%% keys in Bucket on any single vnode.
388407
%% If ClientType is set to 'mapred' instead of 'plain', then the
389408
%% messages will be sent in the form of a MR input stream.
390-
stream_list_keys(Bucket, Timeout, ErrorTolerance, Client, ClientType) ->
409+
stream_list_keys(Bucket0, Timeout, ErrorTolerance, Client, ClientType) ->
391410
ReqId = mk_reqid(),
392-
spawn(Node, riak_kv_keys_fsm, start,
393-
[ReqId,Bucket,Timeout,ClientType,ErrorTolerance,Client]),
394-
{ok, ReqId}.
411+
case build_filter(Bucket0) of
412+
{ok, Filter} ->
413+
spawn(Node, riak_kv_keys_fsm, start,
414+
[ReqId,Filter,Timeout,ClientType,ErrorTolerance,Client]),
415+
{ok, ReqId};
416+
Error ->
417+
Error
418+
end.
395419

396420
%% @spec filter_keys(riak_object:bucket(), Fun :: function()) ->
397421
%% {ok, [Key :: riak_object:key()]} |
@@ -508,3 +532,41 @@ wait_for_listkeys(ReqId,Timeout,Acc) ->
508532
after Timeout ->
509533
{error, timeout, Acc}
510534
end.
535+
536+
add_inputs(_FlowPid, []) ->
537+
ok;
538+
add_inputs(FlowPid, Inputs) when length(Inputs) < 100 ->
539+
luke_flow:add_inputs(FlowPid, Inputs);
540+
add_inputs(FlowPid, Inputs) ->
541+
{Current, Next} = lists:split(100, Inputs),
542+
luke_flow:add_inputs(FlowPid, Current),
543+
add_inputs(FlowPid, Next).
544+
545+
is_key_filter({Bucket, Filters}) when is_binary(Bucket),
546+
is_list(Filters) ->
547+
true;
548+
is_key_filter(_) ->
549+
false.
550+
551+
build_filter({Bucket, Exprs}) ->
552+
case build_exprs(Exprs) of
553+
{ok, Filters} ->
554+
{ok, {Bucket, Filters}};
555+
Error ->
556+
Error
557+
end;
558+
build_filter(Bucket) when is_binary(Bucket) ->
559+
{ok, {Bucket, []}}.
560+
561+
build_exprs(Exprs) ->
562+
build_exprs(Exprs, []).
563+
564+
build_exprs([], Accum) ->
565+
{ok, lists:reverse(Accum)};
566+
build_exprs([[FunName|Args]|T], Accum) ->
567+
case riak_kv_mapred_filters:resolve_name(FunName) of
568+
error ->
569+
{error, {bad_filter, FunName}};
570+
Fun ->
571+
build_exprs(T, [{riak_kv_mapred_filters, Fun, Args}|Accum])
572+
end.

src/riak_kv_js_manager.erl

+78-52
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,22 @@
2222

2323
%% @doc dispatch work to JavaScript VMs
2424
-module(riak_kv_js_manager).
25+
-author('Kevin Smith <[email protected]>').
26+
-author('John Muellerleile <[email protected]>').
2527

2628
-behaviour(gen_server).
2729

2830
%% API
29-
-export([start_link/1,
30-
add_vm/0,
31-
reload/0,
31+
-export([start_link/2,
32+
add_vm/1,
3233
reload/1,
33-
mark_idle/0,
34-
reserve_vm/0,
35-
dispatch/2,
36-
blocking_dispatch/2,
37-
pool_size/0]).
34+
reload/2,
35+
mark_idle/1,
36+
reserve_vm/1,
37+
reserve_batch_vm/2,
38+
dispatch/3,
39+
blocking_dispatch/3,
40+
pool_size/1]).
3841

3942
%% gen_server callbacks
4043
-export([init/1,
@@ -44,43 +47,44 @@
4447
terminate/2,
4548
code_change/3]).
4649

47-
-define(SERVER, ?MODULE).
48-
4950
-record('DOWN', {ref, type, pid, info}).
5051
-record(vm_state, {pid, needs_reload=false}).
51-
-record(state, {master, idle, reserve}).
52+
-record(state, {name, master, idle, reserve}).
53+
54+
start_link(Name, ChildCount) ->
55+
gen_server:start_link({local, Name}, ?MODULE, [Name, ChildCount], []).
5256

53-
start_link(ChildCount) ->
54-
gen_server:start_link({local, ?SERVER}, ?MODULE, [ChildCount], []).
57+
reload(Name, []) ->
58+
reload(Name).
59+
reload(Name) ->
60+
gen_server:call(Name, reload_vms).
5561

56-
reload([]) ->
57-
reload().
58-
reload() ->
59-
gen_server:call(?SERVER, reload_vms).
62+
add_vm(Name) ->
63+
gen_server:cast(Name, {add_vm, self()}).
6064

61-
add_vm() ->
62-
gen_server:cast(?SERVER, {add_vm, self()}).
65+
mark_idle(Name) ->
66+
gen_server:call(Name, {mark_idle, self()}).
6367

64-
mark_idle() ->
65-
gen_server:call(?SERVER, {mark_idle, self()}).
68+
dispatch(Name, JSCall, Tries) ->
69+
dispatch(Name, JSCall, Tries, Tries).
6670

67-
dispatch(JSCall, Tries) ->
68-
dispatch(JSCall, Tries, Tries).
71+
blocking_dispatch(Name, JSCall, Tries) ->
72+
blocking_dispatch(Name, JSCall, Tries, Tries).
6973

70-
blocking_dispatch(JSCall, Tries) ->
71-
blocking_dispatch(JSCall, Tries, Tries).
74+
reserve_vm(Name) ->
75+
gen_server:call(Name, reserve_vm).
7276

73-
reserve_vm() ->
74-
gen_server:call(?SERVER, reserve_vm).
77+
reserve_batch_vm(Name, Tries) ->
78+
reserve_batch_vm(Name, Tries, Tries).
7579

76-
pool_size() ->
77-
gen_server:call(?SERVER, pool_size).
80+
pool_size(Name) ->
81+
gen_server:call(Name, pool_size).
7882

79-
init([ChildCount]) ->
80-
Master = ets:new(jsvm_master, [private, {keypos, 2}]),
81-
Idle = ets:new(jsvm_idle, [private]),
82-
start_vms(ChildCount),
83-
{ok, #state{master=Master, idle=Idle}}.
83+
init([Name, ChildCount]) ->
84+
Master = ets:new(Name, [private, {keypos, 2}]),
85+
Idle = ets:new(Name, [private]),
86+
start_vms(Name, ChildCount),
87+
{ok, #state{name=Name, master=Master, idle=Idle}}.
8488

8589
handle_call({mark_idle, VM}, _From, #state{master=Master,
8690
idle=Idle}=State) ->
@@ -100,6 +104,16 @@ handle_call(reload_vms, _From, #state{master=Master, idle=Idle}=State) ->
100104
riak_kv_vnode:purge_mapcaches(),
101105
{reply, ok, State};
102106

107+
handle_call(reserve_batch_vm, _From, State) ->
108+
{Reply, State1} = case handle_call(reserve_vm, _From, State) of
109+
{reply, {ok, VM}, NewState} ->
110+
riak_kv_js_vm:start_batch(VM),
111+
{{ok, VM}, NewState};
112+
{reply, Error, NewState} ->
113+
{Error, NewState}
114+
end,
115+
{reply, Reply, State1};
116+
103117
handle_call(reserve_vm, _From, #state{idle=Idle}=State) ->
104118
Reply = case ets:first(Idle) of
105119
'$end_of_table' ->
@@ -159,11 +173,11 @@ is_vm_idle(Idle, VMPid) ->
159173
true
160174
end.
161175

162-
start_vms(0) ->
176+
start_vms(_Pool, 0) ->
163177
ok;
164-
start_vms(Count) ->
165-
riak_kv_js_sup:start_js(self()),
166-
start_vms(Count - 1).
178+
start_vms(Pool, Count) ->
179+
riak_kv_js_sup:start_js(self(), Pool),
180+
start_vms(Pool, Count - 1).
167181

168182
reload_idle_vms(Tid) ->
169183
reload_idle_vms(ets:first(Tid), Tid).
@@ -190,33 +204,45 @@ mark_pending_reloads(VMPid, Master, Idle) ->
190204
end,
191205
mark_pending_reloads(ets:next(Master, VMPid), Master, Idle).
192206

193-
dispatch(_JSCall, _MaxCount, 0) ->
207+
dispatch(_Name, _JSCall, _MaxCount, 0) ->
194208
error_logger:info_msg("JS call failed: All VMs are busy.~n"),
195209
{error, no_vms};
196-
dispatch(JSCall, MaxCount, Count) ->
197-
case reserve_vm() of
210+
dispatch(Name, JSCall, MaxCount, Count) ->
211+
case reserve_vm(Name) of
198212
{ok, VM} ->
199213
JobId = {VM, make_ref()},
200214
riak_kv_js_vm:dispatch(VM, self(), JobId, JSCall),
201215
{ok, JobId};
202216
{error, no_vms} ->
203-
ScalingFactor = (1 + (MaxCount - Count)) *
204-
(0.1 + random:uniform(100) * 0.001),
205-
timer:sleep(erlang:round(500 * ScalingFactor)),
206-
dispatch(JSCall, MaxCount, Count - 1)
217+
back_off(MaxCount, Count),
218+
dispatch(Name, JSCall, MaxCount, Count - 1)
207219
end.
208220

209-
blocking_dispatch(_JSCall, _MaxCount, 0) ->
221+
blocking_dispatch(_Name, _JSCall, _MaxCount, 0) ->
210222
error_logger:info_msg("JS call failed: All VMs are busy.~n"),
211223
{error, no_vms};
212-
blocking_dispatch(JSCall, MaxCount, Count) ->
213-
case reserve_vm() of
224+
blocking_dispatch(Name, JSCall, MaxCount, Count) ->
225+
case reserve_vm(Name) of
214226
{ok, VM} ->
215227
JobId = {VM, make_ref()},
216228
riak_kv_js_vm:blocking_dispatch(VM, JobId, JSCall);
217229
{error, no_vms} ->
218-
ScalingFactor = (1 + (MaxCount - Count)) *
219-
(0.1 + random:uniform(100) * 0.001),
220-
timer:sleep(erlang:round(500 * ScalingFactor)),
221-
blocking_dispatch(JSCall, MaxCount, Count - 1)
230+
back_off(MaxCount, Count),
231+
blocking_dispatch(Name, JSCall, MaxCount, Count - 1)
232+
end.
233+
234+
reserve_batch_vm(_Name, _MaxCount, 0) ->
235+
{error, no_vms};
236+
reserve_batch_vm(Name, MaxCount, Count) ->
237+
case gen_server:call(Name, reserve_batch_vm) of
238+
{error, no_vms} ->
239+
back_off(MaxCount, Count),
240+
reserve_batch_vm(Name, MaxCount, Count - 1);
241+
{ok, VM} ->
242+
{ok, VM}
222243
end.
244+
245+
back_off(MaxCount, Count) ->
246+
ScalingFactor = (1 + (MaxCount - Count)) *
247+
(0.1 + random:uniform(100) * 0.001),
248+
timer:sleep(erlang:round(500 * ScalingFactor)).

0 commit comments

Comments
 (0)