Handle no_pass message when limit is 0 83/head
authorRobert Newson <rnewson@apache.org>
Mon, 9 Jan 2017 12:30:02 +0000 (12:30 +0000)
committerRobert Newson <rnewson@apache.org>
Wed, 11 Jan 2017 18:08:57 +0000 (18:08 +0000)
A view query that uses both a filter and a limit value can appear to
'hang' because the workers continue to run even after the number of
requested items have been returned, as workers that find no more
matches return 'no_pass' messages not 'change' messages, and thus the
short-circuiting code is never activated.

COUCHDB-3269

src/fabric_rpc.erl
src/fabric_view_changes.erl

index d2ef8a9..679a305 100644 (file)
@@ -351,7 +351,9 @@ changes_enumerator(DocInfo, Acc) ->
     #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
     case [X || X <- couch_changes:filter(Db, DocInfo, Filter), X /= null] of
     [] ->
-        ChangesRow = {no_pass, Seq};
+        ChangesRow = {no_pass, [
+            {pending, Pending-1},
+            {seq, Seq}]};
     Results ->
         Opts = if Conflicts -> [conflicts | DocOptions]; true -> DocOptions end,
         ChangesRow = {change, [
index 8f7a6f1..3739744 100644 (file)
@@ -256,6 +256,19 @@ handle_message({complete, Props}, Worker, #collector{limit=0} = State) ->
     end,
     maybe_stop(State#collector{offset = O1});
 
+handle_message({no_pass, Props}, {Worker, From}, #collector{limit=0} = State)
+  when is_list(Props) ->
+    #collector{counters = S0, offset = O0} = State,
+    O1 = case fabric_dict:lookup_element(Worker, O0) of
+        null ->
+            fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0);
+        _ ->
+            O0
+    end,
+    S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0),
+    rexi:stream_ack(From),
+    maybe_stop(State#collector{counters = S1, offset = O1});
+
 handle_message(#change{} = Row, {Worker, From}, St) ->
     Change = {change, [
         {seq, Row#change.key},
@@ -288,7 +301,12 @@ handle_message({change, Props}, {Worker, From}, St) ->
     rexi:stream_ack(From),
     {Go, St#collector{counters=S1, offset=O1, limit=Limit-1, user_acc=Acc}};
 
-handle_message({no_pass, Seq}, {Worker, From}, St) ->
+%% upgrade clause
+handle_message({no_pass, Seq}, From, St) when is_integer(Seq) ->
+    handle_message({no_pass, [{seq, Seq}]}, From, St);
+
+handle_message({no_pass, Props}, {Worker, From}, St) ->
+    Seq = couch_util:get_value(seq, Props),
     #collector{counters = S0} = St,
     true = fabric_dict:is_key(Worker, S0),
     S1 = fabric_dict:store(Worker, Seq, S0),