Move fabric streams to a fabric_streams module
authorNick Vatamaniuc <vatamane@apache.org>
Thu, 20 Dec 2018 17:11:10 +0000 (12:11 -0500)
committerNick Vatamaniuc <nickva@users.noreply.github.com>
Thu, 20 Dec 2018 20:41:55 +0000 (15:41 -0500)
Streams functionality is fairly isolated from the rest of the utils module so
move it to its own. This is mostly in preparation to add a streams workers
cleaner process.

src/couch_replicator/src/couch_replicator_fabric.erl
src/fabric/src/fabric_streams.erl [new file with mode: 0644]
src/fabric/src/fabric_util.erl
src/fabric/src/fabric_view_all_docs.erl
src/fabric/src/fabric_view_changes.erl
src/fabric/src/fabric_view_map.erl
src/fabric/src/fabric_view_reduce.erl

index 6998b28..1650105 100644 (file)
@@ -27,12 +27,12 @@ docs(DbName, Options, QueryArgs, Callback, Acc) ->
            Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref) of
+        case fabric_streams:start(Workers0, #shard.ref) of
             {ok, Workers} ->
                 try
                     docs_int(DbName, Workers, QueryArgs, Callback, Acc)
                 after
-                    fabric_util:cleanup(Workers)
+                    fabric_streams:cleanup(Workers)
                 end;
             {timeout, NewState} ->
                 DefunctWorkers = fabric_util:remove_done_workers(
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl
new file mode 100644 (file)
index 0000000..32217c3
--- /dev/null
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_streams).
+
+-export([
+    start/2,
+    start/4,
+    cleanup/1
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+start(Workers, Keypos) ->
+    start(Workers, Keypos, undefined, undefined).
+
+start(Workers0, Keypos, StartFun, Replacements) ->
+    Fun = fun handle_stream_start/3,
+    Acc = #stream_acc{
+        workers = fabric_dict:init(Workers0, waiting),
+        start_fun = StartFun,
+        replacements = Replacements
+    },
+    Timeout = fabric_util:request_timeout(),
+    case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
+        {ok, #stream_acc{workers=Workers}} ->
+            true = fabric_view:is_progress_possible(Workers),
+            AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
+                rexi:stream_start(From),
+                [Worker | WorkerAcc]
+            end, [], Workers),
+            {ok, AckedWorkers};
+        Else ->
+            Else
+    end.
+
+
+cleanup(Workers) ->
+    fabric_util:cleanup(Workers).
+
+
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
+    case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
+    {ok, Workers} ->
+        {ok, St#stream_acc{workers=Workers}};
+    error ->
+        Reason = {nodedown, <<"progress not possible">>},
+        {error, Reason}
+    end;
+
+handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
+    Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
+    Replacements = St#stream_acc.replacements,
+    case {fabric_view:is_progress_possible(Workers), Reason} of
+    {true, _} ->
+        {ok, St#stream_acc{workers=Workers}};
+    {false, {maintenance_mode, _Node}} when Replacements /= undefined ->
+        % Check if we have replacements for this range
+        % and start the new workers if so.
+        case lists:keytake(Worker#shard.range, 1, Replacements) of
+            {value, {_Range, WorkerReplacements}, NewReplacements} ->
+                FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
+                    NewWorker = (St#stream_acc.start_fun)(Repl),
+                    fabric_dict:store(NewWorker, waiting, NewWorkers)
+                end, Workers, WorkerReplacements),
+                % Assert that our replaced worker provides us
+                % the oppurtunity to make progress.
+                true = fabric_view:is_progress_possible(FinalWorkers),
+                NewRefs = fabric_dict:fetch_keys(FinalWorkers),
+                {new_refs, NewRefs, St#stream_acc{
+                    workers=FinalWorkers,
+                    replacements=NewReplacements
+                }};
+            false ->
+                % If we progress isn't possible and we don't have any
+                % replacements then we're dead in the water.
+                Error = {nodedown, <<"progress not possible">>},
+                {error, Error}
+        end;
+    {false, _} ->
+        {error, fabric_util:error_info(Reason)}
+    end;
+
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
+    case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
+    undefined ->
+        % This worker lost the race with other partition copies, terminate
+        rexi:stream_cancel(From),
+        {ok, St};
+    waiting ->
+        % Don't ack the worker yet so they don't start sending us
+        % rows until we're ready
+        Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
+        Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
+        case fabric_dict:any(waiting, Workers1) of
+            true ->
+                {ok, St#stream_acc{workers=Workers1}};
+            false ->
+                {stop, St#stream_acc{workers=Workers1}}
+        end
+    end;
+
+handle_stream_start({ok, ddoc_updated}, _, St) ->
+    cleanup(St#stream_acc.workers),
+    {stop, ddoc_updated};
+
+handle_stream_start(Else, _, _) ->
+    exit({invalid_stream_start, Else}).
index e622c6a..cc1f1b6 100644 (file)
@@ -16,7 +16,6 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2, doc_id_and_rev/1]).
 -export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0]).
--export([stream_start/2, stream_start/4]).
 -export([log_timeout/2, remove_done_workers/2]).
 -export([is_users_db/1, is_replicator_db/1, fake_db/2]).
 -export([upgrade_mrargs/1]).
@@ -51,93 +50,6 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
 
-stream_start(Workers, Keypos) ->
-    stream_start(Workers, Keypos, undefined, undefined).
-
-stream_start(Workers0, Keypos, StartFun, Replacements) ->
-    Fun = fun handle_stream_start/3,
-    Acc = #stream_acc{
-        workers = fabric_dict:init(Workers0, waiting),
-        start_fun = StartFun,
-        replacements = Replacements
-    },
-    Timeout = request_timeout(),
-    case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
-        {ok, #stream_acc{workers=Workers}} ->
-            true = fabric_view:is_progress_possible(Workers),
-            AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
-                rexi:stream_start(From),
-                [Worker | WorkerAcc]
-            end, [], Workers),
-            {ok, AckedWorkers};
-        Else ->
-            Else
-    end.
-
-handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
-    case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
-    {ok, Workers} ->
-        {ok, St#stream_acc{workers=Workers}};
-    error ->
-        Reason = {nodedown, <<"progress not possible">>},
-        {error, Reason}
-    end;
-handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
-    Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
-    Replacements = St#stream_acc.replacements,
-    case {fabric_view:is_progress_possible(Workers), Reason} of
-    {true, _} ->
-        {ok, St#stream_acc{workers=Workers}};
-    {false, {maintenance_mode, _Node}} when Replacements /= undefined ->
-        % Check if we have replacements for this range
-        % and start the new workers if so.
-        case lists:keytake(Worker#shard.range, 1, Replacements) of
-            {value, {_Range, WorkerReplacements}, NewReplacements} ->
-                FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
-                    NewWorker = (St#stream_acc.start_fun)(Repl),
-                    fabric_dict:store(NewWorker, waiting, NewWorkers)
-                end, Workers, WorkerReplacements),
-                % Assert that our replaced worker provides us
-                % the oppurtunity to make progress.
-                true = fabric_view:is_progress_possible(FinalWorkers),
-                NewRefs = fabric_dict:fetch_keys(FinalWorkers),
-                {new_refs, NewRefs, St#stream_acc{
-                    workers=FinalWorkers,
-                    replacements=NewReplacements
-                }};
-            false ->
-                % If we progress isn't possible and we don't have any
-                % replacements then we're dead in the water.
-                Error = {nodedown, <<"progress not possible">>},
-                {error, Error}
-        end;
-    {false, _} ->
-        {error, fabric_util:error_info(Reason)}
-    end;
-handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
-    case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
-    undefined ->
-        % This worker lost the race with other partition copies, terminate
-        rexi:stream_cancel(From),
-        {ok, St};
-    waiting ->
-        % Don't ack the worker yet so they don't start sending us
-        % rows until we're ready
-        Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
-        Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
-        case fabric_dict:any(waiting, Workers1) of
-            true ->
-                {ok, St#stream_acc{workers=Workers1}};
-            false ->
-                {stop, St#stream_acc{workers=Workers1}}
-        end
-    end;
-handle_stream_start({ok, ddoc_updated}, _, St) ->
-    cleanup(St#stream_acc.workers),
-    {stop, ddoc_updated};
-handle_stream_start(Else, _, _) ->
-    exit({invalid_stream_start, Else}).
-
 recv(Workers, Keypos, Fun, Acc0) ->
     rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
 
index 30c8e8d..a404125 100644 (file)
@@ -26,12 +26,12 @@ go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
             Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref) of
+        case fabric_streams:start(Workers0, #shard.ref) of
             {ok, Workers} ->
                 try
                     go(DbName, Options, Workers, QueryArgs, Callback, Acc)
                 after
-                    fabric_util:cleanup(Workers)
+                    fabric_streams:cleanup(Workers)
                 end;
             {timeout, NewState} ->
                 DefunctWorkers = fabric_util:remove_done_workers(
index 7288f1a..f96bb05 100644 (file)
@@ -166,7 +166,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
     end,
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+        case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, Workers} ->
                 try
                     LiveSeqs = lists:map(fun(W) ->
@@ -178,7 +178,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
                     send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
                             Callback, AccIn, Timeout)
                 after
-                    fabric_util:cleanup(Workers)
+                    fabric_streams:cleanup(Workers)
                 end;
             {timeout, NewState} ->
                 DefunctWorkers = fabric_util:remove_done_workers(
index b6a3d6f..ee51bfe 100644 (file)
@@ -36,14 +36,14 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
     Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+        case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, ddoc_updated} ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
                     go(DbName, Workers, VInfo, Args, Callback, Acc)
                 after
-                    fabric_util:cleanup(Workers)
+                    fabric_streams:cleanup(Workers)
                 end;
             {timeout, NewState} ->
                 DefunctWorkers = fabric_util:remove_done_workers(
index a74be10..b2b8a05 100644 (file)
@@ -35,14 +35,14 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
     Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+        case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, ddoc_updated} ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
                     go2(DbName, Workers, VInfo, Args, Callback, Acc)
                 after
-                    fabric_util:cleanup(Workers)
+                    fabric_streams:cleanup(Workers)
                 end;
             {timeout, NewState} ->
                 DefunctWorkers = fabric_util:remove_done_workers(