Clean rexi stream workers when coordinator process is killed
authorNick Vatamaniuc <vatamane@apache.org>
Thu, 20 Dec 2018 17:19:01 +0000 (12:19 -0500)
committerNick Vatamaniuc <nickva@users.noreply.github.com>
Thu, 20 Dec 2018 20:41:55 +0000 (15:41 -0500)
Sometimes fabric coordinators end up getting brutally terminated [1], and in that
case they might never process their `after` clause where their remote rexi
workers are killed. Those workers are left lingering around keeping databases
active for up to 5 minutes at a time.

To prevent that from happening, let coordinators which use streams spawn an
auxiliary cleaner process. This process will monitor the main coordinator and
if it dies will ensure remote workers are killed, freeing resources
immediately. In order not to send 2x the number of kill messages during the
normal exit, fabric_util:cleanup() will stop the auxiliary process before
continuing.

[1] One instance is when the ddoc cache is refreshed:
 https://github.com/apache/couchdb/blob/master/src/ddoc_cache/src/ddoc_cache_entry.erl#L236

src/fabric/src/fabric_streams.erl

index 32217c3..ae0c2be 100644 (file)
@@ -22,6 +22,9 @@
 -include_lib("mem3/include/mem3.hrl").
 
 
+-define(WORKER_CLEANER, fabric_worker_cleaner).
+
+
 start(Workers, Keypos) ->
     start(Workers, Keypos, undefined, undefined).
 
@@ -32,6 +35,7 @@ start(Workers0, Keypos, StartFun, Replacements) ->
         start_fun = StartFun,
         replacements = Replacements
     },
+    spawn_worker_cleaner(self(), Workers0),
     Timeout = fabric_util:request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
         {ok, #stream_acc{workers=Workers}} ->
@@ -47,6 +51,16 @@ start(Workers0, Keypos, StartFun, Replacements) ->
 
 
 cleanup(Workers) ->
+    % Stop the auxiliary cleaner process as we got to the point where cleanup
+    % happesn in the regular fashion so we don't want to send 2x the number kill
+    % messages
+    case get(?WORKER_CLEANER) of
+        CleanerPid when is_pid(CleanerPid) ->
+            erase(?WORKER_CLEANER),
+            exit(CleanerPid, kill);
+        _ ->
+            ok
+    end,
     fabric_util:cleanup(Workers).
 
 
@@ -72,6 +86,7 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
             {value, {_Range, WorkerReplacements}, NewReplacements} ->
                 FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
                     NewWorker = (St#stream_acc.start_fun)(Repl),
+                    add_worker_to_cleaner(self(), NewWorker),
                     fabric_dict:store(NewWorker, waiting, NewWorkers)
                 end, Workers, WorkerReplacements),
                 % Assert that our replaced worker provides us
@@ -117,3 +132,120 @@ handle_stream_start({ok, ddoc_updated}, _, St) ->
 
 handle_stream_start(Else, _, _) ->
     exit({invalid_stream_start, Else}).
+
+
+% Spawn an auxiliary rexi worker cleaner. This will be used in cases
+% when the coordinator (request) process is forceably killed and doesn't
+% get a chance to process its `after` fabric:clean/1 clause.
+spawn_worker_cleaner(Coordinator, Workers) ->
+    case get(?WORKER_CLEANER) of
+        undefined ->
+            Pid = spawn(fun() ->
+                erlang:monitor(process, Coordinator),
+                cleaner_loop(Coordinator, Workers)
+            end),
+            put(?WORKER_CLEANER, Pid),
+            Pid;
+         ExistingCleaner ->
+            ExistingCleaner
+   end.
+
+
+cleaner_loop(Pid, Workers) ->
+    receive
+        {add_worker, Pid, Worker} ->
+            cleaner_loop(Pid, [Worker | Workers]);
+        {'DOWN', _, _, Pid, _} ->
+            fabric_util:cleanup(Workers)
+    end.
+
+
+add_worker_to_cleaner(CoordinatorPid, Worker) ->
+    case get(?WORKER_CLEANER) of
+        CleanerPid when is_pid(CleanerPid) ->
+            CleanerPid ! {add_worker, CoordinatorPid, Worker};
+        _ ->
+            ok
+    end.
+
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+worker_cleaner_test_() ->
+    {
+        "Fabric spawn_worker_cleaner test", {
+            setup, fun setup/0, fun teardown/1,
+            fun(_) -> [
+                should_clean_workers(),
+                does_not_fire_if_cleanup_called(),
+                should_clean_additional_worker_too()
+            ] end
+        }
+    }.
+
+
+should_clean_workers() ->
+    ?_test(begin
+        meck:reset(rexi),
+        erase(?WORKER_CLEANER),
+        Workers = [
+            #shard{node = 'n1', ref = make_ref()},
+            #shard{node = 'n2', ref = make_ref()}
+        ],
+        {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+        Cleaner = spawn_worker_cleaner(Coord, Workers),
+        Ref = erlang:monitor(process, Cleaner),
+        Coord ! die,
+        receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
+        ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+    end).
+
+
+does_not_fire_if_cleanup_called() ->
+    ?_test(begin
+        meck:reset(rexi),
+        erase(?WORKER_CLEANER),
+        Workers = [
+            #shard{node = 'n1', ref = make_ref()},
+            #shard{node = 'n2', ref = make_ref()}
+        ],
+        {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+        Cleaner = spawn_worker_cleaner(Coord, Workers),
+        Ref = erlang:monitor(process, Cleaner),
+        cleanup(Workers),
+        Coord ! die,
+        receive {'DOWN', Ref, _, _, _} -> ok end,
+        % 2 calls would be from cleanup/1 function. If cleanup process fired
+        % too it would have been 4 calls total.
+        ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+    end).
+
+
+should_clean_additional_worker_too() ->
+    ?_test(begin
+        meck:reset(rexi),
+        erase(?WORKER_CLEANER),
+        Workers = [
+            #shard{node = 'n1', ref = make_ref()}
+        ],
+        {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+        Cleaner = spawn_worker_cleaner(Coord, Workers),
+        add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
+        Ref = erlang:monitor(process, Cleaner),
+        Coord ! die,
+        receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
+        ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+    end).
+
+
+setup() ->
+    ok = meck:expect(rexi, kill, fun(_, _) -> ok end).
+
+
+teardown(_) ->
+    meck:unload().
+
+-endif.