SOLR-13171 : A true streaming parser for javabin payload/stream without creating...
authorNoble Paul <noble@apache.org>
Wed, 13 Feb 2019 07:23:43 +0000 (18:23 +1100)
committerNoble Paul <noble@apache.org>
Wed, 13 Feb 2019 07:23:43 +0000 (18:23 +1100)
18 files changed:
solr/CHANGES.txt
solr/core/src/java/org/apache/solr/response/BinaryResponseWriter.java
solr/solrj/src/java/org/apache/solr/client/solrj/FastStreamingDocsCallback.java [new file with mode: 0644]
solr/solrj/src/java/org/apache/solr/client/solrj/SolrClient.java
solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingBinaryResponseParser.java
solr/solrj/src/java/org/apache/solr/common/SolrDocument.java
solr/solrj/src/java/org/apache/solr/common/util/DataEntry.java [new file with mode: 0644]
solr/solrj/src/java/org/apache/solr/common/util/DataInputInputStream.java
solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
solr/solrj/src/java/org/apache/solr/common/util/FastJavaBinDecoder.java [new file with mode: 0644]
solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
solr/solrj/src/java/org/apache/solr/common/util/StringBytes.java [new file with mode: 0644]
solr/solrj/src/java/org/apache/solr/common/util/Utf8CharSequence.java
solr/solrj/src/java/org/apache/solr/common/util/Utils.java
solr/solrj/src/test-files/solrj/javabin_sample.bin [new file with mode: 0644]
solr/solrj/src/test/org/apache/solr/common/util/TestFastJavabinDecoder.java [new file with mode: 0644]
solr/solrj/src/test/org/apache/solr/common/util/TestJavaBinCodec.java
solr/solrj/src/test/org/apache/solr/common/util/Utf8CharSequenceTest.java

index 70a030c..460daae 100644 (file)
@@ -52,6 +52,7 @@ Upgrade Notes
 New Features
 ----------------------
 
+* SOLR-13171 : A true streaming parser for javabin payload/stream without creating any objects (noble)
 
 Bug Fixes
 ----------------------
index be6317b..4b1fdc5 100644 (file)
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.response;
 
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -60,6 +61,14 @@ public class BinaryResponseWriter implements BinaryQueryResponseWriter {
     }
   }
 
+  private static void serialize(SolrQueryResponse response,Resolver resolver, String f) throws IOException {
+    try (JavaBinCodec jbc = new JavaBinCodec(resolver); FileOutputStream fos = new FileOutputStream(f)) {
+      jbc.setWritableDocFields(resolver).marshal(response.getValues(), fos);
+      fos.flush();
+    }
+
+  }
+
   @Override
   public void write(Writer writer, SolrQueryRequest request, SolrQueryResponse response) throws IOException {
     throw new RuntimeException("This is a binary writer , Cannot write to a characterstream");
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/FastStreamingDocsCallback.java b/solr/solrj/src/java/org/apache/solr/client/solrj/FastStreamingDocsCallback.java
new file mode 100644 (file)
index 0000000..57701ff
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj;
+
+
+import org.apache.solr.common.util.DataEntry;
+
+public interface FastStreamingDocsCallback {
+  /** callback for a doclist
+   *
+   * @return the object to be shared with all the {{@link #startDoc(Object)}} calls. return null if nothing needs to be shared
+   */
+  default Object initDocList(Long numFound, Long start, Float maxScore) {
+    return null;
+  }
+
+
+  /**
+   * Started a document
+   *
+   * @param docListObj This object is the value returned by the {{@link #initDocList(Long, Long, Float)}} method
+   * @return any arbitrary object that should be shared between each field
+   */
+  Object startDoc(Object docListObj);
+
+  /**
+   * FOund a new field
+   *
+   * @param field  Read the appropriate value
+   * @param docObj The object returned by {{@link #startDoc(Object)}} method
+   */
+  void field(DataEntry field, Object docObj);
+
+  /**
+   * A document ends
+   *
+   * @param docObj The object returned by {{@link #startDoc(Object)}} method
+   */
+  default void endDoc(Object docObj) { }
+
+  /** A new child doc starts
+   * @param parentDocObj an objec that will be shared across all the  {{@link FastStreamingDocsCallback#field(DataEntry, Object)}}
+   * @return any custom object that be shared with the fields in this child doc
+   */
+  default Object startChildDoc(Object parentDocObj) {
+    return null;
+  }
+
+
+
+}
index 8aebea0..885edc9 100644 (file)
@@ -1058,9 +1058,19 @@ public abstract class SolrClient implements Serializable, Closeable {
    */
   public QueryResponse queryAndStreamResponse(String collection, SolrParams params, StreamingResponseCallback callback)
       throws SolrServerException, IOException {
-    ResponseParser parser = new StreamingBinaryResponseParser(callback);
+    return getQueryResponse(collection, params,  new StreamingBinaryResponseParser(callback));
+  }
+
+  public QueryResponse queryAndStreamResponse(String collection, SolrParams params, FastStreamingDocsCallback callback)
+      throws SolrServerException, IOException {
+    return getQueryResponse(collection, params, new StreamingBinaryResponseParser(callback));
+  }
+
+  private QueryResponse getQueryResponse(String collection, SolrParams params, ResponseParser parser) throws SolrServerException, IOException {
     QueryRequest req = new QueryRequest(params);
-    req.setStreamingResponseCallback(callback);
+    if (parser instanceof StreamingBinaryResponseParser) {
+      req.setStreamingResponseCallback(((StreamingBinaryResponseParser) parser).callback);
+    }
     req.setResponseParser(parser);
     return req.process(this, collection);
   }
index b70daee..5c41f6b 100644 (file)
@@ -20,11 +20,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 
+import org.apache.solr.client.solrj.FastStreamingDocsCallback;
 import org.apache.solr.client.solrj.StreamingResponseCallback;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.DataEntry;
+import org.apache.solr.common.util.DataEntry.EntryListener;
 import org.apache.solr.common.util.DataInputInputStream;
+import org.apache.solr.common.util.FastJavaBinDecoder;
+import org.apache.solr.common.util.FastJavaBinDecoder.EntryImpl;
+import org.apache.solr.common.util.FastJavaBinDecoder.Tag;
 import org.apache.solr.common.util.JavaBinCodec;
 import org.apache.solr.common.util.NamedList;
 
@@ -36,64 +42,133 @@ import org.apache.solr.common.util.NamedList;
  * @since solr 4.0
  */
 public class StreamingBinaryResponseParser extends BinaryResponseParser {
-  final StreamingResponseCallback callback;
-  
-  public StreamingBinaryResponseParser( StreamingResponseCallback cb )
-  {
+  public final StreamingResponseCallback callback;
+  public final FastStreamingDocsCallback fastCallback;
+
+  public StreamingBinaryResponseParser(StreamingResponseCallback cb) {
     this.callback = cb;
+    fastCallback = null;
+  }
+
+  public StreamingBinaryResponseParser(FastStreamingDocsCallback cb) {
+    this.fastCallback = cb;
+    this.callback = null;
+
   }
   
   @Override
   public NamedList<Object> processResponse(InputStream body, String encoding) {
-    try (JavaBinCodec codec = new JavaBinCodec() {
+    if (callback != null) {
+      return streamDocs(body);
+    } else {
+      try {
+        return fastStreamDocs(body, fastCallback);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to parse", e);
+      }
+    }
 
-        private int nestedLevel;
-
-        @Override
-        public SolrDocument readSolrDocument(DataInputInputStream dis) throws IOException {
-          nestedLevel++;
-          SolrDocument doc = super.readSolrDocument(dis);
-          nestedLevel--;
-          if (nestedLevel == 0) {
-            // parent document
-            callback.streamSolrDocument(doc);
-            return null;
-          } else {
-            // child document
-            return doc;
-          }
+  }
+
+  private NamedList<Object> fastStreamDocs(InputStream body, FastStreamingDocsCallback fastCallback) throws IOException {
+
+    fieldListener = new EntryListener() {
+      @Override
+      public void entry(DataEntry field) {
+        if (((EntryImpl) field).getTag() == Tag._SOLRDOC) {
+          field.listenContainer(fastCallback.startChildDoc(field.ctx()), fieldListener);
+        } else {
+          fastCallback.field(field,  field.ctx());
         }
+      }
 
-        @Override
-        public SolrDocumentList readSolrDocumentList(DataInputInputStream dis) throws IOException {
-          SolrDocumentList solrDocs = new SolrDocumentList();
-          List list = (List) readVal(dis);
-          solrDocs.setNumFound((Long) list.get(0));
-          solrDocs.setStart((Long) list.get(1));
-          solrDocs.setMaxScore((Float) list.get(2));
-
-          callback.streamDocListInfo( 
-              solrDocs.getNumFound(), 
-              solrDocs.getStart(), 
-              solrDocs.getMaxScore() );
-          
-          // Read the Array
-          tagByte = dis.readByte();
-          if( (tagByte >>> 5) != (ARR >>> 5) ) {
-            throw new RuntimeException( "doclist must have an array" );
-          } 
-          int sz = readSize(dis);
-          for (int i = 0; i < sz; i++) {
-            // must be a SolrDocument
-            readVal( dis ); 
+      @Override
+      public void end(DataEntry e) {
+        fastCallback.endDoc(((EntryImpl) e).ctx);
+      }
+    };
+    docListener = e -> {
+      EntryImpl entry = (EntryImpl) e;
+      if (entry.getTag() == Tag._SOLRDOC) {//this is a doc
+        entry.listenContainer(fastCallback.startDoc(entry.ctx()), fieldListener);
+      }
+    };
+    new FastJavaBinDecoder()
+        .withInputStream(body)
+        .decode(new EntryListener() {
+          @Override
+          public void entry(DataEntry e) {
+            EntryImpl entry = (EntryImpl) e;
+            if( !entry.type().isContainer) return;
+            if (e.isKeyValEntry() && entry.getTag() == Tag._SOLRDOCLST) {
+              List l = (List) e.metadata();
+              e.listenContainer(fastCallback.initDocList(
+                  (Long) l.get(0),
+                  (Long) l.get(1),
+                  (Float) l.get(2)),
+                  docListener);
+            } else {
+              e.listenContainer(null, this);
+            }
           }
-          return solrDocs;
+        });
+    return null;
+  }
+
+
+  private EntryListener fieldListener;
+  private EntryListener docListener;
+
+
+  private NamedList<Object> streamDocs(InputStream body) {
+    try (JavaBinCodec codec = new JavaBinCodec() {
+
+      private int nestedLevel;
+
+      @Override
+      public SolrDocument readSolrDocument(DataInputInputStream dis) throws IOException {
+        nestedLevel++;
+        SolrDocument doc = super.readSolrDocument(dis);
+        nestedLevel--;
+        if (nestedLevel == 0) {
+          // parent document
+          callback.streamSolrDocument(doc);
+          return null;
+        } else {
+          // child document
+          return doc;
         }
-      };) {
-      
+      }
+
+      @Override
+      public SolrDocumentList readSolrDocumentList(DataInputInputStream dis) throws IOException {
+        SolrDocumentList solrDocs = new SolrDocumentList();
+        List list = (List) readVal(dis);
+        solrDocs.setNumFound((Long) list.get(0));
+        solrDocs.setStart((Long) list.get(1));
+        solrDocs.setMaxScore((Float) list.get(2));
+
+        callback.streamDocListInfo(
+            solrDocs.getNumFound(),
+            solrDocs.getStart(),
+            solrDocs.getMaxScore());
+
+        // Read the Array
+        tagByte = dis.readByte();
+        if ((tagByte >>> 5) != (ARR >>> 5)) {
+          throw new RuntimeException("doclist must have an array");
+        }
+        int sz = readSize(dis);
+        for (int i = 0; i < sz; i++) {
+          // must be a SolrDocument
+          readVal(dis);
+        }
+        return solrDocs;
+      }
+    };) {
+
       return (NamedList<Object>) codec.unmarshal(body);
-    } 
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "parsing error", e);
     }
   }
index 0563c9e..fd56d9d 100644 (file)
@@ -409,6 +409,7 @@ public class SolrDocument extends SolrDocumentBase<Object, SolrDocument> impleme
 
   @Override
   public int getChildDocumentCount() {
+    if (_childDocuments == null) return 0;
     return _childDocuments.size();
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/DataEntry.java b/solr/solrj/src/java/org/apache/solr/common/util/DataEntry.java
new file mode 100644 (file)
index 0000000..04b3a2d
--- /dev/null
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This represents a data entry in the payload/stream. There are multiple ways to consume the data entry
+ * a) listen to it, if it's a container object, and get callbacks for each sub-entry
+ * b) read as an object using the {{@link #val()}} method. Please note that it creates objects and expect more memory usage
+ * c) read the corresponding primitive value
+ * Do not keep a reference of this Object beyond the scope where it is called. Read the relevant data out.
+ */
+public interface DataEntry {
+  /**
+   * The data type
+   */
+  DataEntry.Type type();
+
+  /**
+   * The index of this entry in the container
+   */
+  long index();
+
+  int intVal();
+
+  long longVal();
+
+  float floatVal();
+
+  double doubleVal();
+
+  boolean boolVal();
+
+  default String strValue() {
+    if (type() == null) return null;
+    return val().toString();
+  }
+
+  /**
+   * The object value
+   */
+  Object val();
+
+  /**
+   * Register a listener to get callbacks for all entries
+   *
+   * @param ctx      This is any object that should be shared with the child entry callbacks
+   * @param listener The listener that handles each entry in this container
+   */
+  void listenContainer(Object ctx, EntryListener listener);
+
+  /**
+   * Some Objects may have metadata. usually there is none
+   */
+
+  Object metadata();
+
+  /**Depth of this Object. The root most object has a depth of 1
+   */
+  int depth();
+
+  /**
+   * If this is a child of another container object this returns a non-null value
+   *
+   * @return the parent container object
+   */
+  DataEntry parent();
+
+  /**
+   * This is the object shared in the parent container in the {{@link #listenContainer(Object, EntryListener)}} method
+   */
+  Object ctx();
+
+  /**
+   * If it is a non-primitive type type and size is known in advance
+   *
+   * if it's a map/list, it's the no:of items in this container
+   *
+   * if it's a {{@link CharSequence}} or byte[] , it's the no:of bytes in the stream
+   *
+   * @return a number greater than or equal to zero if the size is known, -1 if unknown
+   */
+  int length();
+
+  /**
+   * If this object is a key value entry. key value entries have name
+   */
+  boolean isKeyValEntry();
+
+  /**
+   * The name, if this is a map entry , else it returns a null
+   */
+  CharSequence name();
+
+  /**
+   * The types are a superset of json
+   */
+  enum Type {
+    NULL(true),
+    LONG(true),
+    INT(true),
+    BOOL(true),
+    FLOAT(true),
+    DOUBLE(true),
+    DATE(true),
+    /**
+     * A map like json object
+     */
+    KEYVAL_ITER(false, true),
+    /**
+     * An array like json object
+     */
+    ENTRY_ITER(false, true),
+    STR(false),
+    BYTEARR(false),
+    /**
+     * don't know how to stream it. read as an object using {{@link DataEntry#val()}} method
+     */
+    JAVA_OBJ(false);
+    /**
+     * A primitive type which usually maps to a java primitive
+     */
+    public final boolean isPrimitive;
+
+    public final boolean isContainer;
+
+    Type(boolean isPrimitive) {
+      this(isPrimitive, false);
+    }
+
+    Type(boolean isPrimitive, boolean isContainer) {
+      this.isPrimitive = isPrimitive;
+      this.isContainer = isContainer;
+    }
+  }
+
+  interface EntryListener {
+
+    /**
+     * Callback for each entry in this container. once the method call returns, the entry object is not valid anymore
+     * It is usually reused.
+     * If the object value is a {{@link Utf8CharSequence}} do a {{@link Object#clone()}} because the object may be reused
+     *
+     * @param e The entry in the container
+     */
+    void entry(DataEntry e);
+
+    /**
+     * Callback after all entries of this container are streamed
+     *
+     * @param e the container entry
+     */
+    default void end(DataEntry e) {
+    }
+  }
+
+  interface FastDecoder {
+
+    FastDecoder withInputStream(InputStream is);
+
+    Object decode(EntryListener iterListener) throws IOException;
+
+  }
+}
index b8a07be..222bebd 100644 (file)
@@ -18,9 +18,23 @@ package org.apache.solr.common.util;
 
 import java.io.DataInput;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 /**
  * An abstract DataInput that extends InputStream
  */
 public abstract class DataInputInputStream extends InputStream implements DataInput {
+
+  /**If possible, read UTF8 bytes directly from the underlying buffer
+   *
+   * @param utf8 the utf8 ubject to read into
+   * @param len length of the utf8 stream
+   * @return whether it is possible to do a direct read or not
+   */
+  boolean readDirectUtf8(ByteArrayUtf8CharSequence utf8, int len){return false;}
+
+  /**If possible, read ByteBuffer directly from the underlying buffer
+   * @param sz the size of the buffer to be read
+   */
+  public ByteBuffer readDirectByteBuffer(int sz){return null;};
 }
index bbcc129..f7d633d 100644 (file)
@@ -44,6 +44,13 @@ public class FastInputStream extends DataInputInputStream {
     this.end = end;
   }
 
+  @Override
+  boolean readDirectUtf8(ByteArrayUtf8CharSequence utf8, int len) {
+    if (in != null || end < pos + len) return false;
+    utf8.reset(buf, pos, len, null);
+    pos = pos + len;
+    return true;
+  }
 
   public static FastInputStream wrap(InputStream in) {
     return (in instanceof FastInputStream) ? (FastInputStream)in : new FastInputStream(in);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/FastJavaBinDecoder.java b/solr/solrj/src/java/org/apache/solr/common/util/FastJavaBinDecoder.java
new file mode 100644 (file)
index 0000000..2ded345
--- /dev/null
@@ -0,0 +1,829 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.util.DataEntry.EntryListener;
+
+import static org.apache.solr.common.util.FastJavaBinDecoder.Tag._EXTERN_STRING;
+import static org.apache.solr.common.util.JavaBinCodec.*;
+
+public class FastJavaBinDecoder implements DataEntry.FastDecoder {
+  private StreamCodec codec;
+  private EntryImpl rootEntry = new EntryImpl();
+  private InputStream stream;
+
+  private static final DataEntry.EntryListener emptylistener = e -> {
+  };
+
+
+  @Override
+  public FastJavaBinDecoder withInputStream(InputStream is) {
+    this.stream = is;
+    return this;
+  }
+
+  @Override
+  public Object decode(EntryListener listener) throws IOException {
+    rootEntry.entryListener = listener == null ? emptylistener : listener;
+    codec = new StreamCodec(stream);
+    codec.start();
+    EntryImpl entry = codec.beginRead(rootEntry);
+    listener.entry(entry);
+    if (entry.tag.type.isContainer && entry.entryListener != null) {
+      entry.tag.stream(entry, codec);
+    }
+    return entry.ctx;
+  }
+
+
+  static class StreamCodec extends JavaBinCodec {
+
+    final FastInputStream dis;
+
+    StreamCodec(InputStream is) {
+      this.dis = FastInputStream.wrap(is);
+    }
+
+
+    public void skip(int sz) throws IOException {
+      while (sz > 0) {
+        int read = dis.read(bytes, 0, Math.min(bytes.length, sz));
+        sz -= read;
+      }
+
+    }
+
+
+    void start() throws IOException {
+      _init(dis);
+    }
+
+
+    Tag getTag() throws IOException {
+      tagByte = dis.readByte();
+      switch (tagByte >>> 5) {
+        case STR >>> 5:
+          return Tag._STR;
+        case SINT >>> 5:
+          return Tag._SINT;
+        case SLONG >>> 5:
+          return Tag._SLONG;
+        case ARR >>> 5:
+          return Tag._ARR;
+        case ORDERED_MAP >>> 5:
+          return Tag._ORDERED_MAP;
+        case NAMED_LST >>> 5:
+          return Tag._NAMED_LST;
+        case EXTERN_STRING >>> 5:
+          return _EXTERN_STRING;
+      }
+
+      Tag t = lower5BitTags[tagByte];
+      if (t == null) throw new RuntimeException("Invalid type " + tagByte);
+      return t;
+    }
+
+    public ByteBuffer readByteBuffer(DataInputInputStream dis, int sz) throws IOException {
+      ByteBuffer result = dis.readDirectByteBuffer(sz);
+      if(result != null) return result;
+      byte[] arr = new byte[readVInt(dis)];
+      dis.readFully(arr);
+      return ByteBuffer.wrap(arr);
+    }
+
+    public CharSequence readObjKey(Tag ktag) throws IOException {
+      CharSequence key = null;
+      if (ktag.type == DataEntry.Type.STR) {
+        if (ktag == _EXTERN_STRING) key = readExternString(dis);
+        else key = readStr(dis);
+      } else if (ktag.type == DataEntry.Type.NULL) {
+        //no need to do anything
+      } else {
+        throw new RuntimeException("Key must be String");
+      }
+      return key;
+    }
+
+    public EntryImpl beginRead(EntryImpl parent) throws IOException {
+      EntryImpl entry = parent.getChildAndReset();
+      entry.tag = getTag();
+      entry.tag.lazyRead(entry, this);
+      if (entry.tag.type.isPrimitive) entry.consumedFully = true;
+      return entry;
+    }
+  }
+
+
+  public class EntryImpl implements DataEntry {
+    //size
+    int size = -1;
+    Tag tag;
+    Object metadata;
+    EntryImpl parent, child;
+    long numericVal;
+    double doubleVal;
+    Object objVal;
+    public Object ctx;
+    boolean boolVal;
+    boolean mapEntry;
+    long idx;
+
+    EntryListener entryListener;
+
+    boolean consumedFully = false;
+    int depth = 0;
+    CharSequence name;
+
+
+    EntryImpl getChildAndReset() {
+      if (child == null) {
+        child = new EntryImpl();
+        child.parent = this;
+        child.depth = depth + 1;
+      }
+      child.reset();
+      return child;
+
+    }
+
+    @Override
+    public long index() {
+      return idx;
+    }
+
+    @Override
+    public int length() {
+      return size;
+    }
+
+    public Tag getTag() {
+      return tag;
+    }
+
+    @Override
+    public boolean boolVal() {
+      return boolVal;
+    }
+
+    @Override
+    public boolean isKeyValEntry() {
+      return mapEntry;
+    }
+
+    @Override
+    public CharSequence name() {
+      return name;
+    }
+
+    @Override
+    public int depth() {
+      return depth;
+    }
+
+    @Override
+    public DataEntry parent() {
+      return parent;
+    }
+
+    @Override
+    public Object metadata() {
+      return metadata;
+    }
+
+    @Override
+    public Object ctx() {
+      return parent == null ? null : parent.ctx;
+    }
+
+    @Override
+    public Type type() {
+      return tag.type;
+    }
+
+    @Override
+    public int intVal() {
+      return (int) numericVal;
+    }
+
+    @Override
+    public long longVal() {
+      return numericVal;
+    }
+
+    @Override
+    public float floatVal() {
+      if(tag.type == Type.FLOAT) return (float) doubleVal;
+      else {
+        return ((Number) val()).floatValue();
+      }
+    }
+
+    @Override
+    public double doubleVal() {
+      return doubleVal;
+    }
+
+    @Override
+    public Object val() {
+      if (objVal != null) return objVal;
+      try {
+        return objVal = tag.readObject(codec, this);
+      } catch (IOException e) {
+        throw new RuntimeException("Error with stream", e);
+      } finally {
+        consumedFully = true;
+      }
+    }
+
+    @Override
+    public void listenContainer(Object ctx, EntryListener listener) {
+      this.entryListener = listener;
+      this.ctx = ctx;
+    }
+
+    void reset() {
+      this.doubleVal = 0.0d;
+      this.numericVal = 0l;
+      this.objVal = null;
+      this.ctx = null;
+      this.entryListener = null;
+      this.size = -1;
+      this.tag = null;
+      consumedFully = false;
+      metadata = null;
+      name = null;
+      idx = -1;
+    }
+
+    public void callEnd() {
+      if (entryListener != null) entryListener.end(this);
+
+    }
+  }
+
+  static final boolean LOWER_5_BITS = true;
+  static final boolean UPPER_3_BITS = false;
+
+  public enum Tag {
+    _NULL(NULL, LOWER_5_BITS, DataEntry.Type.NULL),
+    _BOOL_TRUE(BOOL_TRUE, LOWER_5_BITS, DataEntry.Type.BOOL) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec streamCodec) {
+        entry.boolVal = true;
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Boolean.TRUE;
+      }
+    },
+    _BOOL_FALSE(BOOL_FALSE, LOWER_5_BITS, DataEntry.Type.BOOL) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec streamCodec) {
+        entry.boolVal = false;
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Boolean.FALSE;
+      }
+    },
+    _BYTE(BYTE, LOWER_5_BITS, DataEntry.Type.INT) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+        entry.numericVal = streamCodec.dis.readByte();
+        entry.consumedFully = true;
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Byte.valueOf((byte) entry.numericVal);
+      }
+    },
+    _SHORT(SHORT, LOWER_5_BITS, DataEntry.Type.INT) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+        entry.numericVal = streamCodec.dis.readShort();
+        entry.consumedFully = true;
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Short.valueOf((short) entry.numericVal);
+      }
+    },
+    _DOUBLE(DOUBLE, LOWER_5_BITS, DataEntry.Type.DOUBLE) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+        entry.doubleVal = streamCodec.dis.readDouble();
+        entry.consumedFully = true;
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Double.valueOf(entry.doubleVal);
+      }
+    },
+    _INT(INT, LOWER_5_BITS, DataEntry.Type.INT) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+        entry.numericVal = streamCodec.dis.readInt();
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Integer.valueOf((int) entry.numericVal);
+      }
+
+    },//signed integer
+    _LONG(LONG, LOWER_5_BITS, DataEntry.Type.LONG) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+        entry.numericVal = streamCodec.dis.readLong();
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Long.valueOf(entry.numericVal);
+      }
+    },
+    _FLOAT(FLOAT, LOWER_5_BITS, DataEntry.Type.FLOAT) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+        entry.doubleVal = streamCodec.dis.readFloat();
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Float.valueOf((float) entry.doubleVal);
+      }
+    },
+    _DATE(DATE, LOWER_5_BITS, DataEntry.Type.DATE) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+        entry.numericVal = streamCodec.dis.readLong();
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return new Date(entry.numericVal);
+      }
+    },
+    _MAP(MAP, LOWER_5_BITS, DataEntry.Type.KEYVAL_ITER) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.size = readObjSz(codec, entry.tag);
+      }
+
+      @Override
+      public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+        try {
+          for (int i = 0; i < entry.size; i++) {
+            CharSequence key = codec.readObjKey(codec.getTag());
+            callbackMapEntryListener(entry, key, codec, i);
+          }
+        } finally {
+          entry.callEnd();
+        }
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        return codec.readMap(codec.dis,entry.size);
+      }
+    },
+    _SOLRDOC(SOLRDOC, LOWER_5_BITS, DataEntry.Type.KEYVAL_ITER) {
+      @Override
+      public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+        try {
+          codec.getTag();
+          entry.size = codec.readSize(codec.dis);//  readObjSz(codec, entry.tag);
+          for (int i = 0; i < entry.size; i++) {
+            Tag tag = codec.getTag();
+            if (tag == _SOLRDOC) {
+              EntryImpl e = entry.getChildAndReset();
+              e.tag = tag;
+              e.idx = i;
+              Tag.callbackIterListener(entry, e, codec);
+            } else {
+              CharSequence key = codec.readObjKey(tag);
+              callbackMapEntryListener(entry, key, codec, i);
+            }
+
+          }
+        } finally {
+          entry.callEnd();
+
+        }
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        return codec.readSolrDocument(codec.dis);
+      }
+    },
+    _SOLRDOCLST(SOLRDOCLST, LOWER_5_BITS, DataEntry.Type.ENTRY_ITER) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.metadata = codec.readVal(codec.dis);
+        codec.getTag();//ignore this
+        entry.size = codec.readSize(codec.dis);
+      }
+
+      @Override
+      public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+        try {
+          for (int i = 0; i < entry.size; i++) {
+            EntryImpl newEntry = codec.beginRead(entry);
+            newEntry.idx = i;
+            Tag.callbackIterListener(entry, newEntry, codec);
+          }
+        } finally {
+          entry.callEnd();
+        }
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        SolrDocumentList solrDocs = new SolrDocumentList();
+        if(entry.metadata != null){
+          List list = (List) entry.metadata;
+          solrDocs.setNumFound((Long) list.get(0));
+          solrDocs.setStart((Long) list.get(1));
+          solrDocs.setMaxScore((Float) list.get(2));
+        }
+        List<SolrDocument> l =  codec.readArray(codec.dis, entry.size);
+        solrDocs.addAll(l);
+        return solrDocs;
+      }
+    },
+    _BYTEARR(BYTEARR, LOWER_5_BITS, DataEntry.Type.BYTEARR) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.size = readVInt(codec.dis);
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        ByteBuffer buf = codec.readByteBuffer(codec.dis, entry.size);
+        entry.size = buf.limit() - buf.position();
+        return buf;
+      }
+
+      @Override
+      public void skip(EntryImpl entry, StreamCodec codec) throws IOException {
+        codec.skip(entry.size);
+      }
+    },
+    _ITERATOR(ITERATOR, LOWER_5_BITS, DataEntry.Type.ENTRY_ITER) {
+      @Override
+      public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+        try {
+          long idx = 0;
+          while (true) {
+            EntryImpl newEntry = codec.beginRead(entry);
+            newEntry.idx = idx++;
+            if (newEntry.tag == _END) break;
+            newEntry.idx = idx++;
+            Tag.callbackIterListener(entry, newEntry, codec);
+          }
+        } finally {
+          entry.callEnd();
+        }
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        return codec.readIterator(codec.dis);
+      }
+    },
+
+    _END(END, LOWER_5_BITS, null),
+
+    _SOLRINPUTDOC(SOLRINPUTDOC, LOWER_5_BITS, DataEntry.Type.JAVA_OBJ) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.objVal = readObject(codec, entry);
+        entry.consumedFully = true;
+      }
+    },
+    _MAP_ENTRY_ITER(MAP_ENTRY_ITER, LOWER_5_BITS, DataEntry.Type.KEYVAL_ITER) {
+      @Override
+      public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+        long idx = 0;
+        for (; ; ) {
+          Tag tag = codec.getTag();
+          if (tag == Tag._END) break;
+          CharSequence key = codec.readObjKey(tag);
+          callbackMapEntryListener(entry, key, codec, idx++);
+        }
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        return codec.readMapIter(codec.dis);
+      }
+    },
+    _ENUM_FIELD_VALUE(ENUM_FIELD_VALUE, LOWER_5_BITS, DataEntry.Type.JAVA_OBJ) {
+
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.objVal =codec.readEnumFieldValue(codec.dis);
+        entry.consumedFully = true;
+      }
+    },
+    _MAP_ENTRY(MAP_ENTRY, LOWER_5_BITS, DataEntry.Type.JAVA_OBJ) {
+      //doesn't support streaming
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.objVal = codec.readMapEntry(codec.dis);
+        entry.consumedFully = true;
+      }
+    },
+    // types that combine tag + length (or other info) in a single byte
+    _TAG_AND_LEN(TAG_AND_LEN, UPPER_3_BITS, null),
+    _STR(STR, UPPER_3_BITS, DataEntry.Type.STR) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.size = readObjSz(codec, this);
+
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        return codec.readUtf8(codec.dis);
+      }
+
+      @Override
+      public void skip(EntryImpl entry, StreamCodec codec) throws IOException {
+        codec.skip(entry.size);
+      }
+    },
+    _SINT(SINT, UPPER_3_BITS, DataEntry.Type.INT) {//unsigned integer
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.numericVal = codec.readSmallInt(codec.dis);
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Integer.valueOf((int) entry.numericVal);
+      }
+
+    },
+    _SLONG(SLONG, UPPER_3_BITS, DataEntry.Type.LONG) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.numericVal = codec.readSmallLong(codec.dis);
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) {
+        return Long.valueOf((int) entry.numericVal);
+      }
+
+
+    },
+    _ARR(ARR, UPPER_3_BITS, DataEntry.Type.ENTRY_ITER) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.size = readObjSz(codec, this);
+      }
+
+      @Override
+      public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+        for (int i = 0; i < entry.size; i++) {
+          EntryImpl newEntry = codec.beginRead(entry);
+          newEntry.idx = i;
+          Tag.callbackIterListener(entry, newEntry, codec);
+        }
+      }
+
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        return codec.readArray(codec.dis);
+      }
+    }, //
+    _ORDERED_MAP(ORDERED_MAP, UPPER_3_BITS, DataEntry.Type.KEYVAL_ITER) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.size = readObjSz(codec, entry.tag);
+      }
+
+      @Override
+      public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+        _MAP.stream(entry, codec);
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        return codec.readOrderedMap(codec.dis);
+      }
+
+    }, // SimpleOrderedMap (a NamedList subclass, and more common)
+    _NAMED_LST(NAMED_LST, UPPER_3_BITS, DataEntry.Type.KEYVAL_ITER) {
+      @Override
+      public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+        entry.size = readObjSz(codec, entry.tag);
+      }
+
+      @Override
+      public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+        _MAP.stream(entry, codec);
+      }
+
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        return codec.readNamedList(codec.dis);
+      }
+    }, // NamedList
+
+    _EXTERN_STRING(EXTERN_STRING, UPPER_3_BITS, DataEntry.Type.STR) {
+      @Override
+      public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+        return codec.readExternString(codec.dis);
+      }
+    };
+
+    private static int readObjSz(StreamCodec codec, Tag tag) throws IOException {
+      return tag.isLower5Bits ?
+          StreamCodec.readVInt(codec.dis) :
+          codec.readSize(codec.dis);
+    }
+
+    private static void callbackMapEntryListener(EntryImpl entry, CharSequence key, StreamCodec codec, long idx)
+        throws IOException {
+      EntryImpl newEntry = codec.beginRead(entry);
+      newEntry.name = key;
+      newEntry.mapEntry = true;
+      newEntry.idx = idx;
+      try {
+        if (entry.entryListener != null) entry.entryListener.entry(newEntry);
+      } finally {
+        // the listener did not consume the entry
+        postCallback(codec, newEntry);
+      }
+    }
+
+    private static void callbackIterListener(EntryImpl parent, EntryImpl newEntry, StreamCodec codec)
+        throws IOException {
+      try {
+        newEntry.mapEntry = false;
+        if(parent.entryListener != null) parent.entryListener.entry(newEntry);
+      } finally {
+        // the listener did not consume the entry
+        postCallback(codec, newEntry);
+      }
+    }
+
+    private static void postCallback(StreamCodec codec, EntryImpl newEntry) throws IOException {
+      if (!newEntry.consumedFully) {
+        if (newEntry.tag.type.isContainer) {
+          //this is a map like container object and there is a listener
+          if (newEntry.entryListener == null) newEntry.entryListener = emptylistener;
+          newEntry.tag.stream(newEntry, codec);
+        } else {
+          newEntry.tag.skip(newEntry, codec);
+        }
+      }
+    }
+
+
+    final int code;
+    final boolean isLower5Bits;
+    final DataEntry.Type type;
+
+    Tag(int code, boolean isLower5Bits, DataEntry.Type type) {
+      this.code = code;
+      this.isLower5Bits = isLower5Bits;
+      this.type = type;
+    }
+
+    /**
+     * This applies to only container Objects. This is invoked only if there is a corresponding listener.
+     *
+     */
+    public void stream(EntryImpl currentEntry, StreamCodec codec) throws IOException {
+
+
+    }
+
+    /**
+     * This should read the minimal data about the entry . if the data is a primitive type ,
+     * read the whole thing
+     */
+    public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+
+    }
+
+    /**
+     * Read the entry as an Object. The behavior should be similar to that of {@link JavaBinCodec#readObject(DataInputInputStream)}
+     */
+    public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+      throw new RuntimeException("Unsupported object : " + this.name());
+    }
+
+    /**
+     * Read the entry from and discard the data. Do not create any objects
+     */
+    public void skip(EntryImpl entry, StreamCodec codec) throws IOException {
+      if (entry.tag.type == DataEntry.Type.KEYVAL_ITER || entry.tag.type == DataEntry.Type.ENTRY_ITER) {
+        entry.entryListener = null;
+        stream(entry, codec);
+      } else if (!entry.tag.type.isPrimitive) {
+        readObject(codec, entry);
+      }
+
+    }
+  }
+
+  static final private Tag[] lower5BitTags = new Tag[32];
+
+  static {
+    for (Tag tag : Tag.values()) {
+      if (tag.isLower5Bits) {
+        lower5BitTags[tag.code] = tag;
+      }
+    }
+  }
+
+  public static void main(String[] args) {
+    for (int i = 0; i < lower5BitTags.length; i++) {
+      Tag tag = lower5BitTags[i];
+      if (tag == null) continue;
+      System.out.println(tag.name() + " : " + tag.code + (tag.isLower5Bits ? " lower" : " upper"));
+    }
+  }
+
+
+  private static void addObj(DataEntry e) {
+    if (e.type().isContainer) {
+      Object ctx = e.type() == DataEntry.Type.KEYVAL_ITER ?
+          new LinkedHashMap(getSize(e)) :
+          new ArrayList(getSize(e));
+      if (e.ctx() != null) {
+        if (e.isKeyValEntry()) {
+          ((Map) e.ctx()).put(e.name(), ctx);
+        } else {
+          ((Collection) e.ctx()).add(ctx);
+        }
+      }
+      e.listenContainer(ctx, getEntryListener());
+    } else {
+      Object val = e.val();
+      if (val instanceof Utf8CharSequence) val = ((Utf8CharSequence) val).clone();
+      if (e.ctx() != null) {
+        if (e.isKeyValEntry()) {
+          ((Map) e.ctx()).put(e.name(), val);
+        } else {
+          ((Collection) e.ctx()).add(val);
+        }
+      }
+    }
+  }
+
+  private static int getSize(DataEntry e) {
+    int sz = e.length();
+    if (sz == -1) sz = e.type() == DataEntry.Type.KEYVAL_ITER ? 16 : 10;
+    return sz;
+  }
+
+
+  public static EntryListener getEntryListener() {
+    return ENTRY_LISTENER;
+  }
+
+
+  static final EntryListener ENTRY_LISTENER = FastJavaBinDecoder::addObj;
+
+
+}
index 782d109..9535ee1 100644 (file)
@@ -202,7 +202,7 @@ public class JavaBinCodec implements PushWriter {
     return _init(dis);
   }
 
-  private FastInputStream _init(FastInputStream dis) throws IOException {
+  protected FastInputStream _init(FastInputStream dis) throws IOException {
     version = dis.readByte();
     if (version != VERSION) {
       throw new RuntimeException("Invalid version (expected " + VERSION +
@@ -683,6 +683,10 @@ public class JavaBinCodec implements PushWriter {
   public Map<Object,Object> readMap(DataInputInputStream dis)
           throws IOException {
     int sz = readVInt(dis);
+    return readMap(dis, sz);
+  }
+
+  protected Map<Object, Object> readMap(DataInputInputStream dis, int sz) throws IOException {
     Map<Object, Object> m = newMap(sz);
     for (int i = 0; i < sz; i++) {
       Object key = readVal(dis);
@@ -780,6 +784,10 @@ public class JavaBinCodec implements PushWriter {
 
   public List<Object> readArray(DataInputInputStream dis) throws IOException {
     int sz = readSize(dis);
+    return readArray(dis, sz);
+  }
+
+  protected List readArray(DataInputInputStream dis, int sz) throws IOException {
     ArrayList<Object> l = new ArrayList<>(sz);
     for (int i = 0; i < sz; i++) {
       l.add(readVal(dis));
@@ -796,8 +804,8 @@ public class JavaBinCodec implements PushWriter {
     writeInt(enumFieldValue.toInt());
     writeStr(enumFieldValue.toString());
   }
-  
-  public void writeMapEntry(Entry<Object,Object> val) throws IOException {
+
+  public void writeMapEntry(Map.Entry val) throws IOException {
     writeTag(MAP_ENTRY);
     writeVal(val.getKey());
     writeVal(val.getValue());
@@ -926,12 +934,27 @@ public class JavaBinCodec implements PushWriter {
 
   protected CharSequence readUtf8(DataInputInputStream dis) throws IOException {
     int sz = readSize(dis);
+    return readUtf8(dis, sz);
+  }
+
+  protected CharSequence readUtf8(DataInputInputStream dis, int sz) throws IOException {
+    ByteArrayUtf8CharSequence result = new ByteArrayUtf8CharSequence(null,0,0);
+    if(dis.readDirectUtf8(result, sz)){
+     result.stringProvider= getStringProvider();
+     return result;
+    }
+
     if (sz > MAX_UTF8_SZ) return _readStr(dis, null, sz);
     if (bytesBlock == null) bytesBlock = new BytesBlock(1024 * 4);
     BytesBlock block = this.bytesBlock.expand(sz);
     dis.readFully(block.getBuf(), block.getStartPos(), sz);
 
-    ByteArrayUtf8CharSequence result = new ByteArrayUtf8CharSequence(block.getBuf(), block.getStartPos(), sz);
+    result.reset(block.getBuf(), block.getStartPos(), sz,null);
+    result.stringProvider = getStringProvider();
+    return result;
+  }
+
+  private Function<ByteArrayUtf8CharSequence, String> getStringProvider() {
     if (stringProvider == null) {
       stringProvider = butf8cs -> {
         synchronized (JavaBinCodec.this) {
@@ -941,8 +964,7 @@ public class JavaBinCodec implements PushWriter {
         }
       };
     }
-    result.stringProvider = this.stringProvider;
-    return result;
+    return this.stringProvider;
   }
 
   public void writeInt(int val) throws IOException {
@@ -1231,66 +1253,6 @@ public class JavaBinCodec implements PushWriter {
     }
   }
 
-  public static class StringBytes {
-    byte[] bytes;
-
-    /**
-     * Offset of first valid byte.
-     */
-    int offset;
-
-    /**
-     * Length of used bytes.
-     */
-    private int length;
-    private int hash;
-
-    public StringBytes(byte[] bytes, int offset, int length) {
-      reset(bytes, offset, length);
-    }
-
-    StringBytes reset(byte[] bytes, int offset, int length) {
-      this.bytes = bytes;
-      this.offset = offset;
-      this.length = length;
-      hash = bytes == null ? 0 : Hash.murmurhash3_x86_32(bytes, offset, length, 0);
-      return this;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null) {
-        return false;
-      }
-      if (other instanceof StringBytes) {
-        return this.bytesEquals((StringBytes) other);
-      }
-      return false;
-    }
-
-    boolean bytesEquals(StringBytes other) {
-      assert other != null;
-      if (length == other.length) {
-        int otherUpto = other.offset;
-        final byte[] otherBytes = other.bytes;
-        final int end = offset + length;
-        for (int upto = offset; upto < end; upto++, otherUpto++) {
-          if (bytes[upto] != otherBytes[otherUpto]) {
-            return false;
-          }
-        }
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      return hash;
-    }
-  }
-
   @Override
   public void close() throws IOException {
     if (daos != null) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/StringBytes.java b/solr/solrj/src/java/org/apache/solr/common/util/StringBytes.java
new file mode 100644 (file)
index 0000000..2c95916
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+public class StringBytes {
+  byte[] bytes;
+
+  /**
+   * Offset of first valid byte.
+   */
+  int offset;
+
+  /**
+   * Length of used bytes.
+   */
+  int length;
+  private int hash;
+
+  public StringBytes(byte[] bytes, int offset, int length) {
+    reset(bytes, offset, length);
+  }
+
+  StringBytes reset(byte[] bytes, int offset, int length) {
+    this.bytes = bytes;
+    this.offset = offset;
+    this.length = length;
+    hash = bytes == null ? 0 : Hash.murmurhash3_x86_32(bytes, offset, length, 0);
+    return this;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other instanceof StringBytes) {
+      return this.bytesEquals((StringBytes) other);
+    }
+    return false;
+  }
+
+  boolean bytesEquals(StringBytes other) {
+    assert other != null;
+    if (length == other.length) {
+      int otherUpto = other.offset;
+      final byte[] otherBytes = other.bytes;
+      final int end = offset + length;
+      for (int upto = offset; upto < end; upto++, otherUpto++) {
+        if (bytes[upto] != otherBytes[otherUpto]) {
+          return false;
+        }
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return hash;
+  }
+}
index 1533e5c..9f15afa 100644 (file)
@@ -23,7 +23,7 @@ import java.io.OutputStream;
 /**
  * A byte[] backed String
  */
-public interface Utf8CharSequence extends CharSequence , Comparable {
+public interface Utf8CharSequence extends CharSequence , Comparable, Cloneable {
 
   /**
    * Write the bytes into a buffer. The objective is to avoid the local bytes being exposed to
@@ -71,4 +71,6 @@ public interface Utf8CharSequence extends CharSequence , Comparable {
     }
   }
 
+  Utf8CharSequence clone();
+
 }
index e36091c..d079052 100644 (file)
@@ -84,6 +84,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class Utils {
   public static final Function NEW_HASHMAP_FUN = o -> new HashMap<>();
+  public static final Function NEW_LINKED_HASHMAP_FUN = o -> new LinkedHashMap<>();
   public static final Function NEW_ATOMICLONG_FUN = o -> new AtomicLong();
   public static final Function NEW_ARRAYLIST_FUN = o -> new ArrayList<>();
   public static final Function NEW_HASHSET_FUN = o -> new HashSet<>();
diff --git a/solr/solrj/src/test-files/solrj/javabin_sample.bin b/solr/solrj/src/test-files/solrj/javabin_sample.bin
new file mode 100644 (file)
index 0000000..23cf4bc
Binary files /dev/null and b/solr/solrj/src/test-files/solrj/javabin_sample.bin differ
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/TestFastJavabinDecoder.java b/solr/solrj/src/test/org/apache/solr/common/util/TestFastJavabinDecoder.java
new file mode 100644 (file)
index 0000000..79f1b28
--- /dev/null
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.FastStreamingDocsCallback;
+import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
+import org.apache.solr.client.solrj.impl.StreamingBinaryResponseParser;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.util.FastJavaBinDecoder.Tag;
+
+import static org.apache.solr.common.util.Utils.NEW_ARRAYLIST_FUN;
+import static org.apache.solr.common.util.Utils.NEW_LINKED_HASHMAP_FUN;
+
+public class TestFastJavabinDecoder extends SolrTestCaseJ4 {
+
+
+  public void testTagRead() throws Exception {
+    BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+    FastOutputStream faos = FastOutputStream.wrap(baos);
+
+    JavaBinCodec codec = new JavaBinCodec(faos, null);
+    codec.writeVal(10);
+    codec.writeVal(100);
+    codec.writeVal("Hello!");
+
+    faos.flushBuffer();
+    faos.close();
+
+
+    FastInputStream fis = new FastInputStream(null, baos.getbuf(), 0, baos.size());
+    FastJavaBinDecoder.StreamCodec scodec = new FastJavaBinDecoder.StreamCodec(fis);
+    scodec.start();
+    Tag tag = scodec.getTag();
+    assertEquals(Tag._SINT, tag);
+    assertEquals(10, scodec.readSmallInt(scodec.dis));
+    tag = scodec.getTag();
+    assertEquals(Tag._SINT, tag);
+    assertEquals(100, scodec.readSmallInt(scodec.dis));
+    tag = scodec.getTag();
+    assertEquals(Tag._STR, tag);
+    assertEquals("Hello!", scodec.readStr(fis));
+  }
+
+  public void testSimple() throws IOException {
+    String sampleObj = "{k : v , " +
+        "mapk : {k1: v1, k2 : [v2_1 , v2_2 ]}," +
+        "listk : [ 1, 2, 3 ]," +
+        "maps : [ {id: kov1}, {id : kov2} ,{id:kov3 , longv : 234} ]," +
+        "}";
+
+
+    Map m = (Map) Utils.fromJSONString(sampleObj);
+    BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+    new JavaBinCodec().marshal(m, baos);
+
+    Map m2 = (Map) new JavaBinCodec().unmarshal(new FastInputStream(null, baos.getbuf(), 0, baos.size()));
+
+    LinkedHashMap fastMap = (LinkedHashMap) new FastJavaBinDecoder()
+        .withInputStream(new FastInputStream(null, baos.getbuf(), 0, baos.size()))
+        .decode(FastJavaBinDecoder.getEntryListener());
+    assertEquals(Utils.writeJson(m2, new StringWriter(), true).toString(),
+        Utils.writeJson(fastMap, new StringWriter(), true).toString());
+
+    Object newMap = new FastJavaBinDecoder()
+        .withInputStream(new FastInputStream(null, baos.getbuf(), 0, baos.size()))
+        .decode(e -> {
+          e.listenContainer(new LinkedHashMap<>(), e_ -> {
+            Map rootMap = (Map) e_.ctx();
+            if (e_.type() == DataEntry.Type.ENTRY_ITER) {
+              e_.listenContainer(rootMap.computeIfAbsent(e_.name(), NEW_ARRAYLIST_FUN),
+                  FastJavaBinDecoder.getEntryListener());
+            } else if (e_.type() == DataEntry.Type.KEYVAL_ITER) {
+              e_.listenContainer(rootMap.computeIfAbsent(e_.name(), NEW_LINKED_HASHMAP_FUN), e1 -> {
+                Map m1 = (Map) e1.ctx();
+                if ("k1".equals(e1.name())) {
+                  m1.put(e1.name(), e1.val().toString());
+                }
+                //eat up k2
+              });
+            } else if (e_.type() == DataEntry.Type.STR) {
+              rootMap.put(e_.name(), e_.val().toString());
+            }
+
+          });
+        });
+    ((Map) m2.get("mapk")).remove("k2");
+    assertEquals(Utils.writeJson(m2, new StringWriter(), true).toString(),
+        Utils.writeJson(newMap, new StringWriter(), true).toString());
+
+  }
+
+  public void testFastJavabinStreamingDecoder() throws IOException {
+    BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+    try (InputStream is = getClass().getResourceAsStream("/solrj/javabin_sample.bin")) {
+      IOUtils.copy(is, baos);
+    }
+    SimpleOrderedMap o = (SimpleOrderedMap) new JavaBinCodec().unmarshal(baos.toByteArray());
+    SolrDocumentList list = (SolrDocumentList) o.get("response");
+    System.out.println(" " + list.getNumFound() + " , " + list.getStart() + " , " + list.getMaxScore());
+    class Pojo {
+      long _idx;
+      CharSequence id;
+      boolean inStock;
+      float price;
+      List<NamedList> children;
+    }
+    StreamingBinaryResponseParser parser = new StreamingBinaryResponseParser(new FastStreamingDocsCallback() {
+
+      @Override
+      public Object initDocList(Long numFound, Long start, Float maxScore) {
+        assertEquals((Long) list.getNumFound(), numFound);
+        assertEquals((Long) list.getStart(), start);
+        assertEquals(list.getMaxScore(), maxScore);
+        return new int[1];
+      }
+
+      @Override
+      public Object startDoc(Object docListObj) {
+        Pojo pojo = new Pojo();
+        pojo._idx = ((int[]) docListObj)[0]++;
+        return pojo;
+      }
+
+      @Override
+      public void field(DataEntry field, Object docObj) {
+        Pojo pojo = (Pojo) docObj;
+        if ("id".equals(field.name())) {
+          pojo.id = ((Utf8CharSequence) field.val()).clone();
+        } else if (field.type() == DataEntry.Type.BOOL && "inStock".equals(field.name())) {
+          pojo.inStock = field.boolVal();
+        } else if (field.type() == DataEntry.Type.FLOAT && "price".equals(field.name())) {
+          pojo.price = field.floatVal();
+        }
+
+      }
+
+      @Override
+      public void endDoc(Object docObj) {
+        Pojo pojo = (Pojo) docObj;
+        SolrDocument doc = list.get((int) pojo._idx);
+        assertEquals(doc.get("id"), pojo.id.toString());
+        if (doc.get("inStock") != null)
+          assertEquals(doc.get("inStock"), pojo.inStock);
+        if (doc.get("price") != null)
+          assertEquals((Float) doc.get("price"), pojo.price, 0.001);
+      }
+    });
+
+    parser.processResponse(new FastInputStream(null, baos.getbuf(), 0, baos.size()), null);
+
+
+  }
+
+  public void testParsingWithChildDocs() throws IOException {
+    SolrDocument d1 = TestJavaBinCodec.generateSolrDocumentWithChildDocs();
+    d1.setField("id", "101");
+    SolrDocument d2 = TestJavaBinCodec.generateSolrDocumentWithChildDocs();
+    d2.setField("id", "102");
+    d2.setField("longs", Arrays.asList(100l, 200l));
+
+    SolrDocumentList sdocs = new SolrDocumentList();
+    sdocs.setStart(0);
+    sdocs.setNumFound(2);
+    sdocs.add(d1);
+    sdocs.add(d2);
+
+    SimpleOrderedMap orderedMap = new SimpleOrderedMap();
+    orderedMap.add("response", sdocs);
+
+    BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+    new JavaBinCodec().marshal(orderedMap, baos);
+    boolean[] useListener = new boolean[1];
+    useListener[0] = true;
+
+    class Pojo {
+      CharSequence id;
+      CharSequence subject;
+      CharSequence cat;
+      long[] longs;
+      final List<Pojo> children = new ArrayList<>();
+
+      public void compare(SolrDocument d) {
+        assertEquals(id, d.getFieldValue("id"));
+        assertEquals(subject, d.getFieldValue("subject"));
+        assertEquals(cat, d.getFieldValue("cat"));
+        assertEquals(d.getChildDocumentCount(), children.size());
+        List<Long> l = (List<Long>) d.getFieldValue("longs");
+        if(l != null){
+          assertNotNull(longs);
+          for (int i = 0; i < l.size(); i++) {
+            Long v = l.get(i);
+            assertEquals(v.longValue(), longs[i]);
+          }
+        }
+        List<SolrDocument> childDocuments = d.getChildDocuments();
+        if (childDocuments == null) return;
+        for (int i = 0; i < childDocuments.size(); i++) {
+          children.get(i).compare(childDocuments.get(i));
+        }
+
+      }
+
+    }
+    List<Pojo> l = new ArrayList<>();
+    StreamingBinaryResponseParser binaryResponseParser = new StreamingBinaryResponseParser(new FastStreamingDocsCallback() {
+
+      @Override
+      public Object initDocList(Long numFound, Long start, Float maxScore) {
+        return l;
+      }
+
+      @Override
+      public Object startDoc(Object docListObj) {
+        Pojo pojo = new Pojo();
+        ((List) docListObj).add(pojo);
+        return pojo;
+      }
+
+      @Override
+      public void field(DataEntry field, Object docObj) {
+        Pojo pojo = (Pojo) docObj;
+        if (field.name().equals("id")) {
+          pojo.id = field.strValue();
+        } else if (field.name().equals("subject")) {
+          pojo.subject = field.strValue();
+        } else if (field.name().equals("cat")) {
+          pojo.cat = field.strValue();
+        } else if (field.type() == DataEntry.Type.ENTRY_ITER && "longs".equals(field.name())) {
+          if(useListener[0]){
+            field.listenContainer(pojo.longs = new long[field.length()], READLONGS);
+          } else {
+            List<Long> longList = (List<Long>) field.val();
+            pojo.longs = new long[longList.size()];
+            for (int i = 0; i < longList.size(); i++) {
+              pojo.longs[i] = longList.get(i);
+
+            }
+
+          }
+        }
+
+      }
+
+
+      @Override
+      public Object startChildDoc(Object parentDocObj) {
+        Pojo parent = (Pojo) parentDocObj;
+        Pojo child = new Pojo();
+        parent.children.add(child);
+        return child;
+      }
+    });
+    binaryResponseParser.processResponse(new FastInputStream(null, baos.getbuf(), 0, baos.size()), null);
+    for (int i = 0; i < sdocs.size(); i++) {
+      l.get(i).compare(sdocs.get(i));
+    }
+
+    l.clear();
+
+    useListener[0] = false;
+    binaryResponseParser.processResponse(new FastInputStream(null, baos.getbuf(), 0, baos.size()), null);
+    for (int i = 0; i < sdocs.size(); i++) {
+      l.get(i).compare(sdocs.get(i));
+    }
+
+
+  }
+
+  static final DataEntry.EntryListener READLONGS = e -> {
+    if (e.type() != DataEntry.Type.LONG) return;
+    long[] array = (long[]) e.ctx();
+    array[(int) e.index()] = e.longVal();
+
+  };
+}
index e60d8c7..6717375 100644 (file)
@@ -65,7 +65,7 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
     }
   }
 
-  private SolrDocument generateSolrDocumentWithChildDocs() {
+  public static SolrDocument generateSolrDocumentWithChildDocs() {
     SolrDocument parentDocument = new SolrDocument();
     parentDocument.addField("id", "1");
     parentDocument.addField("subject", "parentDocument");
@@ -376,8 +376,8 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
   }
 
   private void testPerf() throws InterruptedException {
-    final ArrayList<JavaBinCodec.StringBytes> l = new ArrayList<>();
-    Cache<JavaBinCodec.StringBytes, String> cache = null;
+    final ArrayList<StringBytes> l = new ArrayList<>();
+    Cache<StringBytes, String> cache = null;
    /* cache = new ConcurrentLRUCache<JavaBinCodec.StringBytes,String>(10000, 9000, 10000, 1000, false, true, null){
       @Override
       public String put(JavaBinCodec.StringBytes key, String val) {
@@ -388,12 +388,12 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
     Runtime.getRuntime().gc();
     printMem("before cache init");
 
-    Cache<JavaBinCodec.StringBytes, String> cache1 = new MapBackedCache<>(new HashMap<>()) ;
+    Cache<StringBytes, String> cache1 = new MapBackedCache<>(new HashMap<>()) ;
     final JavaBinCodec.StringCache STRING_CACHE = new JavaBinCodec.StringCache(cache1);
 
 //    STRING_CACHE = new JavaBinCodec.StringCache(cache);
     byte[] bytes = new byte[0];
-    JavaBinCodec.StringBytes stringBytes = new JavaBinCodec.StringBytes(null,0,0);
+    StringBytes stringBytes = new StringBytes(null,0,0);
 
     for(int i=0;i<10000;i++) {
       String s = String.valueOf(random().nextLong());
@@ -410,9 +410,9 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
     int THREADS = 10;
 
     runInThreads(THREADS, () -> {
-      JavaBinCodec.StringBytes stringBytes1 = new JavaBinCodec.StringBytes(new byte[0], 0, 0);
+      StringBytes stringBytes1 = new StringBytes(new byte[0], 0, 0);
       for (int i = 0; i < ITERS; i++) {
-        JavaBinCodec.StringBytes b = l.get(i % l.size());
+        StringBytes b = l.get(i % l.size());
         stringBytes1.reset(b.bytes, 0, b.bytes.length);
         if (STRING_CACHE.get(stringBytes1) == null) throw new RuntimeException("error");
       }
@@ -429,7 +429,7 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
       String a = null;
       CharArr arr = new CharArr();
       for (int i = 0; i < ITERS; i++) {
-        JavaBinCodec.StringBytes sb = l.get(i % l.size());
+        StringBytes sb = l.get(i % l.size());
         arr.reset();
         ByteUtils.UTF8toUTF16(sb.bytes, 0, sb.bytes.length, arr);
         a = arr.toString();
index bf3fd26..0483293 100644 (file)
@@ -83,7 +83,7 @@ public class Utf8CharSequenceTest extends SolrTestCaseJ4 {
 
     NamedList nl1 = (NamedList) new JavaBinCodec()
         .setReadStringAsCharSeq(true)
-        .unmarshal(new FastInputStream(null, bytes, 0, bytes.length));
+        .unmarshal(new ByteArrayInputStream( bytes, 0, bytes.length));
     byte[] buf = ((ByteArrayUtf8CharSequence) nl1.getVal(0)).getBuf();
     ByteArrayUtf8CharSequence valLong = (ByteArrayUtf8CharSequence) nl1.get("key_long");
     assertFalse(valLong.getBuf() == buf);