[EAGLE-1081] Checkstyle fixes for eagle-entity-base module
authorColm O hEigeartaigh <coheigea@apache.org>
Wed, 7 Feb 2018 07:07:01 +0000 (23:07 -0800)
committerJay Sen <jsenjaliya@paypal.com>
Wed, 7 Feb 2018 07:07:01 +0000 (23:07 -0800)
<!--
{% comment %}
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[EAGLE-<Jira issue #>] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
       Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `<Jira issue #>` in the title with the actual Jira issue
       number, if there is one.
 - [ ] If this contribution is large, please file an Apache
       [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt).

---

Author: Colm O hEigeartaigh <coheigea@apache.org>

Closes #985 from coheigea/EAGLE-1081.

123 files changed:
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/EntityContext.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/EntityJsonModule.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/MapEntrySerializer.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/NoSuchRowException.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/RowkeyAPIEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogAPIEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/base/taggedlog/TaggedLogObjectMapper.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/AbstractHBaseLogReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/BaseEntityRepository.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityCreationListener.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityQualifierUtils.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/EntityUniq.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericCreateAPIResponseEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityBatchReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/ListQueryAPIResponseEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/LogWriter.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/MetricMetadataEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierCreationListener.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/QualifierNotDefinedException.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyQueryAPIResponseEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/SearchCondition.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/StreamReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/BooleanExpressionComparator.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinition.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogReader.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/LogDeleter.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/RowkeyHelper.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/Schema.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/DefaultEntityRepository.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepository.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/test/TestEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/test/TestLogAPIEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/test/TestTimeSeriesAPIEntity.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/expression/ExpressionParser.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/expression/ParsiiInvalidException.java
eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/expression/ParsiiUnknowVariableException.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/TestGenericServiceAPIResponseEntity.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestDouble2DArraySerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestDoubleSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestHBaseIntegerLogHelper.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestHBaseWriteEntitiesPerformance.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestHBaseWritePerformance.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestTestLogAPIEntity.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/base/taggedlog/TestTaggedLogAPIEntity.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/filter/TestEntityQualifierHelper.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/filter/TestExpressionComparator.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/filter/TestHBaseFilterBuilder.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/filter/TestTypedByteArrayComparator.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/meta/TestArraySerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/meta/TestEntityDefinitionManager.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/meta/TestListSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/meta/TestMapSerDeser.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/repo/TestEntityRepositoryScanner.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/expression/TestExpressionParser.java
eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/expression/TestExpressionPerformance.java

index 17b3fdb..14245df 100644 (file)
@@ -20,21 +20,21 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class EntityContext {
-       private Map<String, Object> context;
+    private Map<String, Object> context;
 
-       public Map<String, Object> getContext() {
-               return context;
-       }
-       
-       public EntityContext() {
-               this.context = new HashMap<>();
-       }
-       
-       protected EntityContext(EntityContext context) {
-               this.context = new HashMap<>(context.context);
-       }
-       
-       public EntityContext cloneEntity() {
-               return new EntityContext(this);
-       }
+    public Map<String, Object> getContext() {
+        return context;
+    }
+
+    public EntityContext() {
+        this.context = new HashMap<>();
+    }
+
+    protected EntityContext(EntityContext context) {
+        this.context = new HashMap<>(context.context);
+    }
+
+    public EntityContext cloneEntity() {
+        return new EntityContext(this);
+    }
 }
index fb86fa6..c291528 100644 (file)
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import java.util.Map;\r
 \r
 public class EntityJsonModule extends SimpleModule {\r
-    public EntityJsonModule(){\r
-        addSerializer(Map.Entry.class,new MapEntrySerializer());\r
+    public EntityJsonModule() {\r
+        addSerializer(Map.Entry.class, new MapEntrySerializer());\r
     }\r
-}
\ No newline at end of file
+}\r
index 4cebbf6..8a80cf0 100644 (file)
@@ -29,10 +29,11 @@ public class MapEntrySerializer extends JsonSerializer<Map.Entry> {
     private static final String VALUE_FIELD = "value";\r
 \r
     @Override\r
-    public void serialize(Map.Entry entry, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {\r
+    public void serialize(Map.Entry entry, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)\r
+        throws IOException, JsonProcessingException {\r
         jsonGenerator.writeStartObject();\r
-        jsonGenerator.writeObjectField(KEY_FIELD,entry.getKey());\r
-        jsonGenerator.writeObjectField(VALUE_FIELD,entry.getValue());\r
+        jsonGenerator.writeObjectField(KEY_FIELD, entry.getKey());\r
+        jsonGenerator.writeObjectField(VALUE_FIELD, entry.getValue());\r
         jsonGenerator.writeEndObject();\r
     }\r
 }\r
index 3304bea..658c20a 100644 (file)
  */
 package org.apache.eagle.log.base.taggedlog;
 
-public class NoSuchRowException extends RuntimeException{
-       static final long serialVersionUID = -4538233994503905943L;
+public class NoSuchRowException extends RuntimeException {
+    static final long serialVersionUID = -4538233994503905943L;
 
-       public NoSuchRowException(){
-               super();
-       }
-       
-       public NoSuchRowException(String s){
-               super(s);
-       }
+    public NoSuchRowException() {
+        super();
+    }
+
+    public NoSuchRowException(String s) {
+        super(s);
+    }
 }
index d72c35a..3c4166c 100644 (file)
@@ -25,57 +25,72 @@ import javax.xml.bind.annotation.XmlType;
 
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
-@XmlType(propOrder = {"success", "exception", "prefixHashCode", "timestamp", "humanTime", "tagNameHashValueHashMap", "fieldNameValueMap"})
+@XmlType(propOrder = {
+                      "success", "exception", "prefixHashCode", "timestamp", "humanTime",
+                      "tagNameHashValueHashMap", "fieldNameValueMap"
+    })
 public class RowkeyAPIEntity {
-       boolean success;
-       String exception;
-       int prefixHashCode;
-       long timestamp;
-       String humanTime;
-       Map<Integer, Integer> tagNameHashValueHashMap;
-       Map<String, String> fieldNameValueMap;
-       
-       public boolean isSuccess() {
-               return success;
-       }
-       public void setSuccess(boolean success) {
-               this.success = success;
-       }
-       public String getException() {
-               return exception;
-       }
-       public void setException(String exception) {
-               this.exception = exception;
-       }
-       public String getHumanTime() {
-               return humanTime;
-       }
-       public void setHumanTime(String humanTime) {
-               this.humanTime = humanTime;
-       }
-       public int getPrefixHashCode() {
-               return prefixHashCode;
-       }
-       public void setPrefixHashCode(int prefixHashcode) {
-               this.prefixHashCode = prefixHashcode;
-       }
-       public long getTimestamp() {
-               return timestamp;
-       }
-       public void setTimestamp(long timestamp) {
-               this.timestamp = timestamp;
-       }
-       public Map<Integer, Integer> getTagNameHashValueHashMap() {
-               return tagNameHashValueHashMap;
-       }
-       public void setTagNameHashValueHashMap(
-                       Map<Integer, Integer> tagNameHashValueHashMap) {
-               this.tagNameHashValueHashMap = tagNameHashValueHashMap;
-       }
-       public Map<String, String> getFieldNameValueMap() {
-               return fieldNameValueMap;
-       }
-       public void setFieldNameValueMap(Map<String, String> fieldNameValueMap) {
-               this.fieldNameValueMap = fieldNameValueMap;
-       }
+    boolean success;
+    String exception;
+    int prefixHashCode;
+    long timestamp;
+    String humanTime;
+    Map<Integer, Integer> tagNameHashValueHashMap;
+    Map<String, String> fieldNameValueMap;
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
+
+    public String getHumanTime() {
+        return humanTime;
+    }
+
+    public void setHumanTime(String humanTime) {
+        this.humanTime = humanTime;
+    }
+
+    public int getPrefixHashCode() {
+        return prefixHashCode;
+    }
+
+    public void setPrefixHashCode(int prefixHashcode) {
+        this.prefixHashCode = prefixHashcode;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public Map<Integer, Integer> getTagNameHashValueHashMap() {
+        return tagNameHashValueHashMap;
+    }
+
+    public void setTagNameHashValueHashMap(Map<Integer, Integer> tagNameHashValueHashMap) {
+        this.tagNameHashValueHashMap = tagNameHashValueHashMap;
+    }
+
+    public Map<String, String> getFieldNameValueMap() {
+        return fieldNameValueMap;
+    }
+
+    public void setFieldNameValueMap(Map<String, String> fieldNameValueMap) {
+        this.fieldNameValueMap = fieldNameValueMap;
+    }
 }
index b396b06..8e6d314 100755 (executable)
@@ -42,10 +42,8 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * rowkey: prefix + timestamp + tagNameValues
- * as of now, all tags will be persisted as a column in hbase table
- * tag name is column qualifier name
- * tag value is column value.
+ * rowkey: prefix + timestamp + tagNameValues as of now, all tags will be persisted as a column in hbase table
+ * tag name is column qualifier name tag value is column value.
  */
 @JsonFilter(TaggedLogAPIEntity.PropertyBeanFilterName)
 public class TaggedLogAPIEntity implements PropertyChangeListener, Serializable {
@@ -63,17 +61,14 @@ public class TaggedLogAPIEntity implements PropertyChangeListener, Serializable
     }
 
     /**
-     * Extra dynamic attributes.
-     * TODO: can we move exp, serializeAlias, serializeVerbose to a wrapper class?
+     * Extra dynamic attributes. TODO: can we move exp, serializeAlias, serializeVerbose to a wrapper class?
      */
     private Map<String, Object> exp;
 
     private String encodedRowkey;
     // track what qualifiers are changed
     private Set<String> modifiedProperties = new HashSet<String>();
-    protected PropertyChangeSupport pcs
-            = new PropertyChangeSupport(this);
-
+    protected PropertyChangeSupport pcs = new PropertyChangeSupport(this);
 
     public Map<String, String> getSerializeAlias() {
         return serializeAlias;
@@ -135,6 +130,7 @@ public class TaggedLogAPIEntity implements PropertyChangeListener, Serializable
         pcs.firePropertyChange(fieldModified, null, null);
     }
 
+    @Override
     public void propertyChange(PropertyChangeEvent evt) {
         modifiedProperties.add(evt.getPropertyName());
     }
@@ -143,6 +139,7 @@ public class TaggedLogAPIEntity implements PropertyChangeListener, Serializable
         return this.modifiedProperties;
     }
 
+    @Override
     public String toString() {
         StringBuffer sb = new StringBuffer();
         sb.append("prefix:");
@@ -189,22 +186,26 @@ public class TaggedLogAPIEntity implements PropertyChangeListener, Serializable
         };
 
         @Override
-        public void serializeAsField(Object pojo, JsonGenerator jgen, SerializerProvider provider, PropertyWriter writer) throws Exception {
+        public void serializeAsField(Object pojo, JsonGenerator jgen, SerializerProvider provider,
+                                     PropertyWriter writer)
+            throws Exception {
             if (pojo instanceof TaggedLogAPIEntity) {
-                TaggedLogAPIEntity entity = (TaggedLogAPIEntity) pojo;
+                TaggedLogAPIEntity entity = (TaggedLogAPIEntity)pojo;
                 Set<String> modified = entity.modifiedQualifiers();
                 Set<String> basePropertyNames = getPropertyNames();
                 String writerName = writer.getName();
                 if (modified.contains(writerName) || basePropertyNames.contains(writerName)) {
                     if ((!entity.isSerializeVerbose() && verboseFields.contains(writerName))
-                            || (timestamp.equals(writerName) && !EntityDefinitionManager.isTimeSeries(entity.getClass()))) {
+                        || (timestamp.equals(writerName)
+                            && !EntityDefinitionManager.isTimeSeries(entity.getClass()))) {
                         // log skip
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("skip field");
                         }
                     } else {
                         // if serializeAlias is not null and exp is not null
-                        if (exp.equals(writerName) && entity.getSerializeAlias() != null && entity.getExp() != null) {
+                        if (exp.equals(writerName) && entity.getSerializeAlias() != null
+                            && entity.getExp() != null) {
                             Map<String, Object> _exp = new HashMap<String, Object>();
                             for (Map.Entry<String, Object> entry : entity.getExp().entrySet()) {
                                 String alias = entity.getSerializeAlias().get(entry.getKey());
@@ -248,4 +249,4 @@ public class TaggedLogAPIEntity implements PropertyChangeListener, Serializable
         mapper.setFilters(TaggedLogAPIEntity.getFilterProvider());
         return mapper;
     }
-}
\ No newline at end of file
+}
index 1df1c0d..1b712c4 100644 (file)
@@ -19,17 +19,20 @@ package org.apache.eagle.log.base.taggedlog;
 import java.util.Map;
 
 public interface TaggedLogObjectMapper {
-       /**
-        * when read, business logic should convert schema-less key/value into business object based on its own schema
-        * @param entity
-        * @param qualifierValues
-        */
-       public void populateQualifierValues(TaggedLogAPIEntity entity, Map<String, byte[]> qualifierValues);
-       
-       /**
-        * when write, business logic should convert business object to schema-less key value
-        * @param entity
-        * @return
-        */
-       public Map<String, byte[]> createQualifierValues(TaggedLogAPIEntity entity);    
+    /**
+     * when read, business logic should convert schema-less key/value into business object based on its own
+     * schema
+     * 
+     * @param entity
+     * @param qualifierValues
+     */
+    public void populateQualifierValues(TaggedLogAPIEntity entity, Map<String, byte[]> qualifierValues);
+
+    /**
+     * when write, business logic should convert business object to schema-less key value
+     * 
+     * @param entity
+     * @return
+     */
+    public Map<String, byte[]> createQualifierValues(TaggedLogAPIEntity entity);
 }
index 916706f..3823d62 100755 (executable)
@@ -33,208 +33,213 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * HBase Log Reader basic initialization:
+ * HBase Log Reader basic initialization.
  * <ol>
- *   <li>Open HBase connection to target HBase table</li>
- *   <li>Generate HBase filter,start and stop row key, output qualifier and Scan </li>
- *   <li><code>onOpen(HTableInterface,Scan)</code>: Callback abstract method </li>
- *   <li><code>close</code>: Close HBase connection</li>
+ * <li>Open HBase connection to target HBase table</li>
+ * <li>Generate HBase filter,start and stop row key, output qualifier and Scan</li>
+ * <li><code>onOpen(HTableInterface,Scan)</code>: Callback abstract method</li>
+ * <li><code>close</code>: Close HBase connection</li>
  * </ol>
  *
  * @param <T> Reader entity class type
- *
  */
 public abstract class AbstractHBaseLogReader<T> implements LogReader<T> {
-       private static Logger LOG = LoggerFactory.getLogger(AbstractHBaseLogReader.class);
-
-       protected byte[][] qualifiers;
-       private HTableInterface tbl;
-       private byte[] startKey;
-       private byte[] stopKey;
-       protected Map<String, List<String>> searchTags;
-       private Filter filter;
-       private Date startTime;
-       private Date endTime;
-
-//     protected ResultScanner rs;
-       private boolean isOpen = false;
-
-       /**
-        * TODO it's ugly that both _ed and prefix fields can hold prefix information,
-        * prefix field should be in precedence over _ed
-        */
-       private String _prefix;
-       protected EntityDefinition _ed;
-
-       public AbstractHBaseLogReader(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
-                                     Filter filter, String lastScanKey, byte[][] outputQualifiers){
-               this(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, null);
-       }
-       /**
-        * This constructor supports partition.
-        *
-        * @param ed entity definition
-        * @param partitions partition values, which is sorted in partition definition order. TODO: in future we need to support
-        * multiple values for one partition field
-        * @param startTime start time of the query
-        * @param endTime end time of the query
-        * @param filter filter for the hbase scan
-        * @param lastScanKey the key of last scan
-        * @param outputQualifiers the bytes of output qualifier names
-        * @param prefix can be populated from outside world specifically for generic metric reader
-        */
-       public AbstractHBaseLogReader(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
-                                     Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix){
-               this.startTime = startTime;
-               this.endTime = endTime;
-               this._ed = ed;
-               if (_ed.getPartitions() != null) {
-                       if (partitions == null || _ed.getPartitions().length != partitions.size()) {
-                               throw new IllegalArgumentException("Invalid argument. Entity " + ed.getClass().getSimpleName() + " defined "
-                                               + "partitions, but argument partitions is null or number of partition values are different!");
-                       }
-               }
-               /**
-                * decide prefix field value
-                */
-               if(prefix == null || prefix.isEmpty()){
-                       this._prefix = _ed.getPrefix();
-               }else{
-                       this._prefix = prefix;
-               }
-               this.qualifiers = outputQualifiers;
-               this.filter = filter;
-
-               this.startKey = buildRowKey(this._prefix, partitions, startTime);
-               
-               
-               /**
-                * startTime should be inclusive, -128 is max value for hbase Bytes comparison, see PureJavaComparer.compareTo
-                * as an alternative, we can use startTime-1000 and endTime-1000 to make sure startTime is inclusive and endTime is exclusive
-                */
-               this.startKey = ByteUtil.concat(this.startKey, new byte[] {-1, -1,-1,-1});
-               if (lastScanKey == null) {
-                       this.stopKey = buildRowKey(this._prefix, partitions, endTime);
-                       // endTime should be exclusive
-                       this.stopKey = ByteUtil.concat(this.stopKey, new byte[] {-1,-1,-1,-1,-1});
-               } else {
-                       // build stop key
-                       this.stopKey = EagleBase64Wrapper.decode(lastScanKey);
-                       // TODO to-be-fixed, probably it's an issue because contacting 1 is not
-                       // enough for lexicographical sorting
-                       this.stopKey = ByteUtil.concat(this.stopKey, new byte[] { 1 });
-               }
-       }
-       
-       /**
-        * TODO If the required field is null for a row, then this row will not be fetched. That could be a problem for counting
-        * Need another version of read to strictly get the number of rows which will return all the columns for a column family
-        */
-       @Override
-       public void open() throws IOException {
-               if (isOpen)
-                       return; // silently return
-               try {
-                       tbl = EagleConfigFactory.load().getHTable(_ed.getTable());
-               } catch (RuntimeException ex) {
-                       throw new IOException(ex);
-               }
-
-               Scan s1 = new Scan();
-               // reverse timestamp, startRow is stopKey, and stopRow is startKey
-               s1.setStartRow(stopKey);
-               s1.setStopRow(startKey);
-               s1.setFilter(filter);
-               // TODO the # of cached rows should be minimum of (pagesize and 100)
-               int cs = EagleConfigFactory.load().getHBaseClientScanCacheSize();
-               s1.setCaching(cs);
-               // TODO not optimized for all applications
-               s1.setCacheBlocks(true)
-               ;
-               // scan specified columnfamily and qualifiers
-               if(this.qualifiers == null) {
-                       // Filter all
-                       s1.addFamily(_ed.getColumnFamily().getBytes());
-               }else{
-                       for (byte[] qualifier : qualifiers) {
-                               s1.addColumn(_ed.getColumnFamily().getBytes(), qualifier);
-                       }
-               }
-               // TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. More graceful implementation should use SingleColumnValueExcludeFilter, 
-               // but it's complicated in current implementation. 
-               workaroundHBASE2198(s1, filter);
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(s1.toString());
-               }
-//             rs = tbl.getScanner(s1);
-               this.onOpen(tbl,s1);
-               isOpen = true;
-       }
-
-       /**
-        * HBase table connection callback function
-        *
-        * @param tbl   HBase table connection
-        * @param scan  HBase scan
-        * @throws IOException
-        */
-       protected abstract void onOpen(HTableInterface tbl,Scan scan) throws IOException;
-
-       /**
-        * <h2>History</h2>
-        * <ul>
-        *      <li><b>Nov 19th, 2014</b>: Fix for out put all qualifiers</li>
-        * </ul>
-        * @param s1
-        * @param filter
-        */
-       protected void workaroundHBASE2198(Scan s1, Filter filter) {
-               if (filter instanceof SingleColumnValueFilter) {
-                       if(this.qualifiers == null){
-                               s1.addFamily(((SingleColumnValueFilter) filter).getFamily());
-                       }else {
-                               s1.addColumn(((SingleColumnValueFilter) filter).getFamily(), ((SingleColumnValueFilter) filter).getQualifier());
-                       }
-                       return;
-               }
-               if (filter instanceof FilterList) {
-                       for (Filter f : ((FilterList)filter).getFilters()) {
-                               workaroundHBASE2198(s1, f);
-                       }
-               }
-       }
-
-       /**
-        * <h2>Close:</h2>
-        * 1. release current table connection
-        *
-        * @throws IOException
-        */
-       @Override
-       public void close() throws IOException {
-               if(tbl != null){
-                       new HTableFactory().releaseHTableInterface(tbl);
-               }
-//             if(rs != null){
-//                     rs.close();
-//             }
-       }
-
-       private static byte[] buildRowKey(String prefix, List<String> partitions, Date t){
-               final int length = (partitions == null) ? (4 + 8) : (4 + 8 + partitions.size() * 4);
-               final byte[] key = new byte[length];
-               int offset = 0;
-               ByteUtil.intToBytes(prefix.hashCode(), key, offset);
-               offset += 4;
-               if (partitions != null) {
-                       for (String partition : partitions) {
-                               ByteUtil.intToBytes(partition.hashCode(), key, offset);
-                               offset += 4;
-                       }
-               }
-               // reverse timestamp
-               long ts = Long.MAX_VALUE - t.getTime();
-               ByteUtil.longToBytes(ts, key, offset);
-               return key;
-       }
+    private static Logger LOG = LoggerFactory.getLogger(AbstractHBaseLogReader.class);
+
+    protected byte[][] qualifiers;
+    private HTableInterface tbl;
+    private byte[] startKey;
+    private byte[] stopKey;
+    protected Map<String, List<String>> searchTags;
+    private Filter filter;
+    private Date startTime;
+    private Date endTime;
+
+    // protected ResultScanner rs;
+    private boolean isOpen = false;
+
+    /**
+     * TODO it's ugly that both ed and prefix fields can hold prefix information, prefix field should be in
+     * precedence over ed.
+     */
+    private String prefix;
+    protected EntityDefinition ed;
+
+    public AbstractHBaseLogReader(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+                                  Filter filter, String lastScanKey, byte[][] outputQualifiers) {
+        this(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, null);
+    }
+
+    /**
+     * This constructor supports partition.
+     *
+     * @param ed entity definition
+     * @param partitions partition values, which is sorted in partition definition order. TODO: in future we
+     *            need to support multiple values for one partition field
+     * @param startTime start time of the query
+     * @param endTime end time of the query
+     * @param filter filter for the hbase scan
+     * @param lastScanKey the key of last scan
+     * @param outputQualifiers the bytes of output qualifier names
+     * @param prefix can be populated from outside world specifically for generic metric reader
+     */
+    public AbstractHBaseLogReader(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+                                  Filter filter, String lastScanKey, byte[][] outputQualifiers,
+                                  String prefix) {
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.ed = ed;
+        if (ed.getPartitions() != null) {
+            if (partitions == null || ed.getPartitions().length != partitions.size()) {
+                throw new IllegalArgumentException("Invalid argument. Entity " + ed.getClass().getSimpleName()
+                                                   + " defined "
+                                                   + "partitions, but argument partitions is null or number of partition values are different!");
+            }
+        }
+        /**
+         * decide prefix field value
+         */
+        if (prefix == null || prefix.isEmpty()) {
+            this.prefix = ed.getPrefix();
+        } else {
+            this.prefix = prefix;
+        }
+        this.qualifiers = outputQualifiers;
+        this.filter = filter;
+
+        this.startKey = buildRowKey(this.prefix, partitions, startTime);
+
+        /**
+         * startTime should be inclusive, -128 is max value for hbase Bytes comparison, see
+         * PureJavaComparer.compareTo as an alternative, we can use startTime-1000 and endTime-1000 to make
+         * sure startTime is inclusive and endTime is exclusive
+         */
+        this.startKey = ByteUtil.concat(this.startKey, new byte[] { -1, -1, -1, -1 });
+        if (lastScanKey == null) {
+            this.stopKey = buildRowKey(this.prefix, partitions, endTime);
+            // endTime should be exclusive
+            this.stopKey = ByteUtil.concat(this.stopKey, new byte[] { -1, -1, -1, -1, -1 });
+        } else {
+            // build stop key
+            this.stopKey = EagleBase64Wrapper.decode(lastScanKey);
+            // TODO to-be-fixed, probably it's an issue because contacting 1 is not
+            // enough for lexicographical sorting
+            this.stopKey = ByteUtil.concat(this.stopKey, new byte[] { 1 });
+        }
+    }
+
+    /**
+     * TODO If the required field is null for a row, then this row will not be fetched. That could be a
+     * problem for counting Need another version of read to strictly get the number of rows which will return
+     * all the columns for a column family
+     */
+    @Override
+    public void open() throws IOException {
+        if (isOpen) {
+            return; // silently return
+        }
+        try {
+            tbl = EagleConfigFactory.load().getHTable(ed.getTable());
+        } catch (RuntimeException ex) {
+            throw new IOException(ex);
+        }
+
+        Scan s1 = new Scan();
+        // reverse timestamp, startRow is stopKey, and stopRow is startKey
+        s1.setStartRow(stopKey);
+        s1.setStopRow(startKey);
+        s1.setFilter(filter);
+        // TODO the # of cached rows should be minimum of (pagesize and 100)
+        int cs = EagleConfigFactory.load().getHBaseClientScanCacheSize();
+        s1.setCaching(cs);
+        // TODO not optimized for all applications
+        s1.setCacheBlocks(true);
+        // scan specified columnfamily and qualifiers
+        if (this.qualifiers == null) {
+            // Filter all
+            s1.addFamily(ed.getColumnFamily().getBytes());
+        } else {
+            for (byte[] qualifier : qualifiers) {
+                s1.addColumn(ed.getColumnFamily().getBytes(), qualifier);
+            }
+        }
+        // TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. More graceful implementation
+        // should use SingleColumnValueExcludeFilter,
+        // but it's complicated in current implementation.
+        workaroundHBASE2198(s1, filter);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(s1.toString());
+        }
+        // rs = tbl.getScanner(s1);
+        this.onOpen(tbl, s1);
+        isOpen = true;
+    }
+
+    /**
+     * HBase table connection callback function.
+     *
+     * @param tbl HBase table connection
+     * @param scan HBase scan
+     * @throws IOException
+     */
+    protected abstract void onOpen(HTableInterface tbl, Scan scan) throws IOException;
+
+    /**
+     * <h2>History</h2>.
+     * <ul>
+     * <li><b>Nov 19th, 2014</b>: Fix for out put all qualifiers</li>
+     * </ul>
+     *
+     * @param s1
+     * @param filter
+     */
+    protected void workaroundHBASE2198(Scan s1, Filter filter) {
+        if (filter instanceof SingleColumnValueFilter) {
+            if (this.qualifiers == null) {
+                s1.addFamily(((SingleColumnValueFilter)filter).getFamily());
+            } else {
+                s1.addColumn(((SingleColumnValueFilter)filter).getFamily(),
+                             ((SingleColumnValueFilter)filter).getQualifier());
+            }
+            return;
+        }
+        if (filter instanceof FilterList) {
+            for (Filter f : ((FilterList)filter).getFilters()) {
+                workaroundHBASE2198(s1, f);
+            }
+        }
+    }
+
+    /**
+     * <h2>Close:</h2> 1. release current table connection
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        if (tbl != null) {
+            new HTableFactory().releaseHTableInterface(tbl);
+        }
+        // if(rs != null){
+        // rs.close();
+        // }
+    }
+
+    private static byte[] buildRowKey(String prefix, List<String> partitions, Date t) {
+        final int length = (partitions == null) ? (4 + 8) : (4 + 8 + partitions.size() * 4);
+        final byte[] key = new byte[length];
+        int offset = 0;
+        ByteUtil.intToBytes(prefix.hashCode(), key, offset);
+        offset += 4;
+        if (partitions != null) {
+            for (String partition : partitions) {
+                ByteUtil.intToBytes(partition.hashCode(), key, offset);
+                offset += 4;
+            }
+        }
+        // reverse timestamp
+        long ts = Long.MAX_VALUE - t.getTime();
+        ByteUtil.longToBytes(ts, key, offset);
+        return key;
+    }
 }
index 71253da..f7de525 100755 (executable)
@@ -18,9 +18,9 @@ package org.apache.eagle.log.entity;
 
 import org.apache.eagle.log.entity.repo.EntityRepository;
 
-public class BaseEntityRepository  extends EntityRepository {
+public class BaseEntityRepository extends EntityRepository {
 
-       public BaseEntityRepository() {
-               entitySet.add(GenericMetricEntity.class);
-       }
+    public BaseEntityRepository() {
+        entitySet.add(GenericMetricEntity.class);
+    }
 }
index 4ad8959..37b163c 100644 (file)
@@ -19,8 +19,8 @@ package org.apache.eagle.log.entity;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 
 /**
- * Interface to notify creation event of an entity 
+ * Interface to notify creation event of an entity
  */
 public interface EntityCreationListener {
-       public void entityCreated(TaggedLogAPIEntity entity) throws Exception;
+    public void entityCreated(TaggedLogAPIEntity entity) throws Exception;
 }
index 6e5cb5c..4747760 100755 (executable)
@@ -34,248 +34,274 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class EntityQualifierUtils {
-       private final static Logger LOG = LoggerFactory.getLogger(EntityQualifierUtils.class);
+    private static final Logger LOG = LoggerFactory.getLogger(EntityQualifierUtils.class);
 
-       public static Map<String,Object> keyValuesToMap(List<KeyValue> row,EntityDefinition ed){
-               Map<String,Object> result = new HashMap<String,Object>();
-               for(KeyValue kv:row){
-                       String qualifierName = new String(kv.getQualifier());
-                       if(!ed.isTag(qualifierName)){
-                               Qualifier qualifier = ed.getDisplayNameMap().get(qualifierName);
-                               if(qualifier == null){
-                                       qualifier = ed.getQualifierNameMap().get(qualifierName);
-                               }
-                               qualifierName = qualifier.getDisplayName();
-                               Object value = qualifier.getSerDeser().deserialize(kv.getValue());
-                               result.put(qualifierName,value);
-                       }else{
-                               result.put(qualifierName,new String(kv.getValue()));
-                       }
-               }
-               return result;
-       }
+    public static Map<String, Object> keyValuesToMap(List<KeyValue> row, EntityDefinition ed) {
+        Map<String, Object> result = new HashMap<String, Object>();
+        for (KeyValue kv : row) {
+            String qualifierName = new String(kv.getQualifier());
+            if (!ed.isTag(qualifierName)) {
+                Qualifier qualifier = ed.getDisplayNameMap().get(qualifierName);
+                if (qualifier == null) {
+                    qualifier = ed.getQualifierNameMap().get(qualifierName);
+                }
+                qualifierName = qualifier.getDisplayName();
+                Object value = qualifier.getSerDeser().deserialize(kv.getValue());
+                result.put(qualifierName, value);
+            } else {
+                result.put(qualifierName, new String(kv.getValue()));
+            }
+        }
+        return result;
+    }
 
-       public static Map<String,Double> keyValuesToDoubleMap(List<KeyValue> row,EntityDefinition ed){
-               Map<String,Double> result = new HashMap<String,Double>();
-               for(KeyValue kv:row){
-                       String qualifierName = new String(kv.getQualifier());
-                       if(!ed.isTag(qualifierName)){
-                               Qualifier qualifier = ed.getDisplayNameMap().get(qualifierName);
-                               if(qualifier == null){
-                                       qualifier = ed.getQualifierNameMap().get(qualifierName);
-                               }
-                               qualifierName = qualifier.getDisplayName();
-                               Object value = qualifier.getSerDeser().deserialize(kv.getValue());
-                               result.put(qualifierName,convertObjToDouble(value));
-                       }else{
-                               result.put(qualifierName,Double.NaN);
-                       }
-               }
-               return result;
-       }
+    public static Map<String, Double> keyValuesToDoubleMap(List<KeyValue> row, EntityDefinition ed) {
+        Map<String, Double> result = new HashMap<String, Double>();
+        for (KeyValue kv : row) {
+            String qualifierName = new String(kv.getQualifier());
+            if (!ed.isTag(qualifierName)) {
+                Qualifier qualifier = ed.getDisplayNameMap().get(qualifierName);
+                if (qualifier == null) {
+                    qualifier = ed.getQualifierNameMap().get(qualifierName);
+                }
+                qualifierName = qualifier.getDisplayName();
+                Object value = qualifier.getSerDeser().deserialize(kv.getValue());
+                result.put(qualifierName, convertObjToDouble(value));
+            } else {
+                result.put(qualifierName, Double.NaN);
+            }
+        }
+        return result;
+    }
 
-       /**
-        * Map[Display Name,Double Value]
-        *
-        * @param map
-        * @param ed
-        * @return
-        */
-       public static Map<String,Double> bytesMapToDoubleMap(Map<String,byte[]> map,EntityDefinition ed){
-               Map<String,Double> result = new HashMap<String,Double>();
-               for(Map.Entry<String,byte[]> entry:map.entrySet()){
-                       String qualifierName = entry.getKey();
-                       Qualifier qualifier = ed.getDisplayNameMap().get(qualifierName);
-                       if(qualifier == null) qualifier = ed.getQualifierNameMap().get(qualifierName);
-                       if(qualifier!=null && entry.getValue()!=null) {
-                               qualifierName = qualifier.getDisplayName();
-                               Object value = qualifier.getSerDeser().deserialize(entry.getValue());
-                               result.put(qualifierName, convertObjToDouble(value));
-                       }else{
-                               result.put(qualifierName,null);
-                       }
-               }
-               return result;
-       }
+    /**
+     * Map[Display Name,Double Value]
+     *
+     * @param map
+     * @param ed
+     * @return
+     */
+    public static Map<String, Double> bytesMapToDoubleMap(Map<String, byte[]> map, EntityDefinition ed) {
+        Map<String, Double> result = new HashMap<String, Double>();
+        for (Map.Entry<String, byte[]> entry : map.entrySet()) {
+            String qualifierName = entry.getKey();
+            Qualifier qualifier = ed.getDisplayNameMap().get(qualifierName);
+            if (qualifier == null) {
+                qualifier = ed.getQualifierNameMap().get(qualifierName);
+            }
+            if (qualifier != null && entry.getValue() != null) {
+                qualifierName = qualifier.getDisplayName();
+                Object value = qualifier.getSerDeser().deserialize(entry.getValue());
+                result.put(qualifierName, convertObjToDouble(value));
+            } else {
+                result.put(qualifierName, null);
+            }
+        }
+        return result;
+    }
 
-       public static byte[] toBytes(EntityDefinition ed, String qualifierName, String qualifierValueInStr){
-               // Get field type from entity class
-               // and skip for not-found fields query expression
-               Object typedValue = null;
-               EntitySerDeser serDeser = null;
-               if(ed.isTag(qualifierName)){
-                       typedValue = qualifierValueInStr;
-                       serDeser = EntityDefinitionManager.getSerDeser(String.class);
-               }else{
-                       try{
-                               Field field = ed.getEntityClass().getDeclaredField(qualifierName);
-                               Class<?> fieldType = field.getType();
-                               serDeser =  EntityDefinitionManager.getSerDeser(fieldType);
-                               if(serDeser == null){
-                                       throw new IllegalArgumentException("Can't find EntitySerDeser for field: "+ qualifierName +"'s type: "+fieldType
-                                                       +", so the field is not supported to be filtered yet");
-                               }
-                               typedValue = convertStringToObject(qualifierValueInStr, fieldType);
-                       } catch (NoSuchFieldException ex) {
-                               // Handle the field not found exception in caller
-                               LOG.error("Field " + qualifierName + " not found in " + ed.getEntityClass());
-                               throw new IllegalArgumentException("Field "+qualifierName+" not found in "+ed.getEntityClass(),ex);
-                       }
-               }
-               return serDeser.serialize(typedValue);
-       }
+    public static byte[] toBytes(EntityDefinition ed, String qualifierName, String qualifierValueInStr) {
+        // Get field type from entity class
+        // and skip for not-found fields query expression
+        Object typedValue = null;
+        EntitySerDeser serDeser = null;
+        if (ed.isTag(qualifierName)) {
+            typedValue = qualifierValueInStr;
+            serDeser = EntityDefinitionManager.getSerDeser(String.class);
+        } else {
+            try {
+                Field field = ed.getEntityClass().getDeclaredField(qualifierName);
+                Class<?> fieldType = field.getType();
+                serDeser = EntityDefinitionManager.getSerDeser(fieldType);
+                if (serDeser == null) {
+                    throw new IllegalArgumentException("Can't find EntitySerDeser for field: " + qualifierName
+                                                       + "'s type: " + fieldType
+                                                       + ", so the field is not supported to be filtered yet");
+                }
+                typedValue = convertStringToObject(qualifierValueInStr, fieldType);
+            } catch (NoSuchFieldException ex) {
+                // Handle the field not found exception in caller
+                LOG.error("Field " + qualifierName + " not found in " + ed.getEntityClass());
+                throw new IllegalArgumentException("Field " + qualifierName + " not found in "
+                                                   + ed.getEntityClass(), ex);
+            }
+        }
+        return serDeser.serialize(typedValue);
+    }
 
-       public static Class<?> getType(EntityDefinition ed, String qualifierName) {
-               Field field;
-               try {
-                       field = ed.getEntityClass().getDeclaredField(qualifierName);
-               } catch (NoSuchFieldException e) {
-                       if(LOG.isDebugEnabled()) LOG.debug("Field "+qualifierName+" not found in "+ed.getEntityClass());
-                       return null;
-               }
-               return field.getType();
-       }
+    public static Class<?> getType(EntityDefinition ed, String qualifierName) {
+        Field field;
+        try {
+            field = ed.getEntityClass().getDeclaredField(qualifierName);
+        } catch (NoSuchFieldException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Field " + qualifierName + " not found in " + ed.getEntityClass());
+            }
+            return null;
+        }
+        return field.getType();
+    }
 
-       /**
-        * Not support negative numeric value:
-        * - http://en.wikipedia.org/wiki/Double-precision_floating-point_format
-        *
-        * @param value
-        * @param type
-        * @return
-        */
-       public static Object convertStringToObject(String value, Class<?> type){
-               Object obj = null;
-               try{
-                       if(String.class.equals(type)){
-                               obj =  value;
-                       }if(Long.class.equals(type) || long.class.equals(type)){
-                               obj = Long.parseLong(value);
-                               // if((Long) obj < 0) throw new IllegalArgumentException("Don't support negative Long yet: "+obj);
-                       }else if(Integer.class.equals(type) || int.class.equals(type)){
-                               obj = Integer.parseInt(value);
-                               // if((Integer) obj < 0) throw new IllegalArgumentException("Don't support negative Integer yet: "+obj);
-                       }else if(Double.class.equals(type) || double.class.equals(type)){
-                               obj = Double.parseDouble(value);
-                               // if((Double) obj < 0) throw new IllegalArgumentException("Don't support negative Double yet: "+obj);
-                       }else if(Float.class.equals(type) || float.class.equals(type)){
-                               obj = Float.parseFloat(value);
-                               // if((Double) obj < 0) throw new IllegalArgumentException("Don't support negative Float yet: "+obj);
-                       }else if(Boolean.class.equals(type) || boolean.class.equals(type)) {
-                               obj = Boolean.valueOf(value);
-                       }
-                       if(obj != null) return obj;
-               }catch (NumberFormatException ex){
-                       throw new IllegalArgumentException("Fail to convert string: "+value +" into type of "+type,ex);
-               }
+    /**
+     * Not support negative numeric value: -
+     * http://en.wikipedia.org/wiki/Double-precision_floating-point_format
+     *
+     * @param value
+     * @param type
+     * @return
+     */
+    public static Object convertStringToObject(String value, Class<?> type) {
+        Object obj = null;
+        try {
+            if (String.class.equals(type)) {
+                obj = value;
+            }
+            if (Long.class.equals(type) || long.class.equals(type)) {
+                obj = Long.parseLong(value);
+                // if((Long) obj < 0) throw new IllegalArgumentException("Don't support negative Long yet:
+                // "+obj);
+            } else if (Integer.class.equals(type) || int.class.equals(type)) {
+                obj = Integer.parseInt(value);
+                // if((Integer) obj < 0) throw new IllegalArgumentException("Don't support negative Integer
+                // yet: "+obj);
+            } else if (Double.class.equals(type) || double.class.equals(type)) {
+                obj = Double.parseDouble(value);
+                // if((Double) obj < 0) throw new IllegalArgumentException("Don't support negative Double yet:
+                // "+obj);
+            } else if (Float.class.equals(type) || float.class.equals(type)) {
+                obj = Float.parseFloat(value);
+                // if((Double) obj < 0) throw new IllegalArgumentException("Don't support negative Float yet:
+                // "+obj);
+            } else if (Boolean.class.equals(type) || boolean.class.equals(type)) {
+                obj = Boolean.valueOf(value);
+            }
+            if (obj != null) {
+                return obj;
+            }
+        } catch (NumberFormatException ex) {
+            throw new IllegalArgumentException("Fail to convert string: " + value + " into type of " + type,
+                                               ex);
+        }
 
-               throw new IllegalArgumentException("Fail to convert string: "+value +" into type of "+type+", illegal type: "+type);
-       }
+        throw new IllegalArgumentException("Fail to convert string: " + value + " into type of " + type
+                                           + ", illegal type: " + type);
+    }
 
-       /**
-        *
-        * @param obj
-        * @return double value, otherwise Double.NaN
-        */
-       public static double convertObjToDouble(Object obj){
-               if(Long.class.equals(obj.getClass()) || long.class.equals(obj.getClass())){
-                       Long _value = (Long) obj;
-                       return _value.doubleValue();
-               }else if(Integer.class.equals(obj.getClass()) || int.class.equals(obj.getClass())){
-                       Integer _value = (Integer) obj;
-                       return _value.doubleValue();
-               }else if(Double.class.equals(obj.getClass()) || double.class.equals(obj.getClass())) {
-                       return (Double) obj;
-               }else if(Float.class.equals(obj.getClass()) || float.class.equals(obj.getClass())) {
-                       Float _value = (Float) obj;
-                       return _value.doubleValue();
-               }else if(Short.class.equals(obj.getClass()) || short.class.equals(obj.getClass())) {
-                       Float _value = (Float) obj;
-                       return _value.doubleValue();
-               }else if(Byte.class.equals(obj.getClass()) || byte.class.equals(obj.getClass())) {
-                       Byte _value = (Byte) obj;
-                       return _value.doubleValue();
-               }
-               LOG.warn("Failed to convert object " + obj.toString() + " in type of " + obj.getClass() + " to double");
-               return Double.NaN;
-       }
+    /**
+     * @param obj
+     * @return double value, otherwise Double.NaN
+     */
+    public static double convertObjToDouble(Object obj) {
+        if (Long.class.equals(obj.getClass()) || long.class.equals(obj.getClass())) {
+            Long _value = (Long)obj;
+            return _value.doubleValue();
+        } else if (Integer.class.equals(obj.getClass()) || int.class.equals(obj.getClass())) {
+            Integer _value = (Integer)obj;
+            return _value.doubleValue();
+        } else if (Double.class.equals(obj.getClass()) || double.class.equals(obj.getClass())) {
+            return (Double)obj;
+        } else if (Float.class.equals(obj.getClass()) || float.class.equals(obj.getClass())) {
+            Float _value = (Float)obj;
+            return _value.doubleValue();
+        } else if (Short.class.equals(obj.getClass()) || short.class.equals(obj.getClass())) {
+            Float _value = (Float)obj;
+            return _value.doubleValue();
+        } else if (Byte.class.equals(obj.getClass()) || byte.class.equals(obj.getClass())) {
+            Byte _value = (Byte)obj;
+            return _value.doubleValue();
+        }
+        LOG.warn("Failed to convert object " + obj.toString() + " in type of " + obj.getClass()
+                 + " to double");
+        return Double.NaN;
+    }
 
-       /**
-        * Parse List String as Set without duplicate items
-        *
-        * <br></br>
-        * Support:
-        * <ul>
-        * <li>normal string: ("a","b") => ["a","b"] </li>
-        * <li>number: (1.5,"b") => [1.5,"b"] </li>
-        * <li>inner string comma: ("va,lue","value",",") => ["va,lue","value",","]</li>
-        * <li>inner escaped chars: ("va\"lue","value") => ["va\"lue","value"]</li>
-        * <li>some bad formats list: ("va"lue","value") => ["va\"lue","value"]</li>
-        * </ul>
-        *
-        * <b>Warning:</b> it will not throw exception if the format is not strictly valid
-        *
-        * @param listValue in format (item1,item2,...)
-        * @return
-        */
-       public static List<String> parseList(String listValue){
-               Matcher matcher = SET_PATTERN.matcher(listValue);
-               if(matcher.find()){
-                       String content = matcher.group(1);
-                       List<String> result = new ArrayList<String>();
-                       StringBuilder str = null;
-                       STATE state = null;
-                       char last = 0;
-                       for(char c: content.toCharArray()){
-                               if(str == null) str = new StringBuilder();
-                               if(c == DOUBLE_QUOTE && last != SLASH){
-                                       // Open or Close String
-                                       if(state == STATE.STRING)
-                                               state = null;
-                                       else state = STATE.STRING;
-                               }else if(c == COMMA && state != STATE.STRING){
-                                       result.add(unescape(str.toString()));
-                                       str = null;
-                                       last = c;
-                                       continue;
-                               }
-                               last = c;
-                               str.append(c);
-                       }
-                       if(str!=null) result.add(unescape(str.toString()));
-                       return result;
-               }else{
-                       LOG.error("Invalid list value: " + listValue);
-                       throw new IllegalArgumentException("Invalid format of list value: "+listValue+", must be in format: (item1,item2,...)");
-               }
-       }
+    /**
+     * Parse List String as Set without duplicate items <br>
+     * <br>
+     * Support:
+     * <ul>
+     * <li>normal string: ("a","b") => ["a","b"]</li>
+     * <li>number: (1.5,"b") => [1.5,"b"]</li>
+     * <li>inner string comma: ("va,lue","value",",") => ["va,lue","value",","]</li>
+     * <li>inner escaped chars: ("va\"lue","value") => ["va\"lue","value"]</li>
+     * <li>some bad formats list: ("va"lue","value") => ["va\"lue","value"]</li>
+     * </ul>
+     * <b>Warning:</b> it will not throw exception if the format is not strictly valid
+     *
+     * @param listValue in format (item1,item2,...)
+     * @return
+     */
+    public static List<String> parseList(String listValue) {
+        Matcher matcher = SET_PATTERN.matcher(listValue);
+        if (matcher.find()) {
+            String content = matcher.group(1);
+            List<String> result = new ArrayList<String>();
+            StringBuilder str = null;
+            STATE state = null;
+            char last = 0;
+            for (char c : content.toCharArray()) {
+                if (str == null) {
+                    str = new StringBuilder();
+                }
+                if (c == DOUBLE_QUOTE && last != SLASH) {
+                    // Open or Close String
+                    if (state == STATE.STRING) {
+                        state = null;
+                    } else {
+                        state = STATE.STRING;
+                    }
+                } else if (c == COMMA && state != STATE.STRING) {
+                    result.add(unescape(str.toString()));
+                    str = null;
+                    last = c;
+                    continue;
+                }
+                last = c;
+                str.append(c);
+            }
+            if (str != null) {
+                result.add(unescape(str.toString()));
+            }
+            return result;
+        } else {
+            LOG.error("Invalid list value: " + listValue);
+            throw new IllegalArgumentException("Invalid format of list value: " + listValue
+                                               + ", must be in format: (item1,item2,...)");
+        }
+    }
 
-       private static String unescape(String str){
-               int start=0,end = str.length();
-               if(str.startsWith("\"")) start = start +1;
-               if(str.endsWith("\"")) end = end -1;
-               str = str.substring(start,end);
-               return StringEscapeUtils.unescapeJava(str);
-       }
+    private static String unescape(String str) {
+        int start = 0;
+        int end = str.length();
+        if (str.startsWith("\"")) {
+            start = start + 1;
+        }
+        if (str.endsWith("\"")) {
+            end = end - 1;
+        }
+        str = str.substring(start, end);
+        return StringEscapeUtils.unescapeJava(str);
+    }
 
-       private final static Pattern SET_PATTERN = Pattern.compile("^\\((.*)\\)$");
-       private final static char COMMA = ',';
-       private final static char DOUBLE_QUOTE = '"';
-       private final static char SLASH = '\\';
-       private static enum STATE{ STRING }
+    private static final Pattern SET_PATTERN = Pattern.compile("^\\((.*)\\)$");
+    private static final char COMMA = ',';
+    private static final char DOUBLE_QUOTE = '"';
+    private static final char SLASH = '\\';
 
+    private static enum STATE {
+        STRING
+    }
 
-
-//  TODO: NOT FINISHED
-//  private final static Map<String,String> ESCAPE_REGEXP=new HashMap<String,String>(){{
-//                     this.put("\\.","\\\\.");
-//     }};
-//
-//     public static String escapeRegExp(String value) {
-//             String _value = value;
-//             for(Map.Entry<String,String> entry:ESCAPE_REGEXP.entrySet()){
-//                     _value = _value.replace(entry.getKey(),entry.getValue());
-//             }
-//             return _value;
-//     }
-}
\ No newline at end of file
+    // TODO: NOT FINISHED
+    // private static final Map<String,String> ESCAPE_REGEXP=new HashMap<String,String>(){{
+    // this.put("\\.","\\\\.");
+    // }};
+    //
+    // public static String escapeRegExp(String value) {
+    // String _value = value;
+    // for(Map.Entry<String,String> entry:ESCAPE_REGEXP.entrySet()){
+    // _value = _value.replace(entry.getKey(),entry.getValue());
+    // }
+    // return _value;
+    // }
+}
index 36e1e0b..df75e33 100755 (executable)
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 /**
- * 
+ *
  */
 package org.apache.eagle.log.entity;
 
@@ -23,45 +23,48 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-
 /**
  * @since Sep 12, 2014
  */
 public class EntityUniq {
-       
-       public Map<String, String> tags;
-       public Long timestamp;
-       public long createdTime; // for cache removal;
-       
-       public EntityUniq(Map<String, String> tags, long timestamp) {
-               this.tags = new HashMap<String, String>(tags);
-               this.timestamp = timestamp;
-               this.createdTime = System.currentTimeMillis();
-       }
-       
-       @Override       
-       public boolean equals(Object obj) {             
-               if (obj instanceof EntityUniq) {
-                       EntityUniq au = (EntityUniq) obj;
-                       if (tags.size() != au.tags.size()) return false;
-                       for (Entry<String, String> keyValue : au.tags.entrySet()) {
-                               boolean keyExist = tags.containsKey(keyValue.getKey());
-                               if ( !keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) {                           
-                                       return false;
-                               }
-                       }
-                       if (!timestamp.equals(au.timestamp)) return false;
-                       return true;
-               }
-               return false;
-       }
-       
-       @Override
-       public int hashCode() { 
-               int hashCode = 0;
-               for (String value : tags.values()) {
-                       hashCode ^= value.hashCode();   
-               }
-               return hashCode ^= timestamp.hashCode();
-       }
+
+    public Map<String, String> tags;
+    public Long timestamp;
+    public long createdTime; // for cache removal;
+
+    public EntityUniq(Map<String, String> tags, long timestamp) {
+        this.tags = new HashMap<String, String>(tags);
+        this.timestamp = timestamp;
+        this.createdTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof EntityUniq) {
+            EntityUniq au = (EntityUniq)obj;
+            if (tags.size() != au.tags.size()) {
+                return false;
+            }
+            for (Entry<String, String> keyValue : au.tags.entrySet()) {
+                boolean keyExist = tags.containsKey(keyValue.getKey());
+                if (!keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) {
+                    return false;
+                }
+            }
+            if (!timestamp.equals(au.timestamp)) {
+                return false;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        int hashCode = 0;
+        for (String value : tags.values()) {
+            hashCode ^= value.hashCode();
+        }
+        return hashCode ^= timestamp.hashCode();
+    }
 }
index e308bc3..e97ecbb 100644 (file)
@@ -28,30 +28,37 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
-@XmlType(propOrder = {"success", "exception", "encodedRowkeys"})
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown=true)
-public class GenericCreateAPIResponseEntity{
-       private boolean success;
-       private String exception;
-       private List<String> encodedRowkeys;
-       
-       public List<String> getEncodedRowkeys() {
-               return encodedRowkeys;
-       }
-       public void setEncodedRowkeys(List<String> encodedRowkeys) {
-               this.encodedRowkeys = encodedRowkeys;
-       }
-       public boolean isSuccess() {
-               return success;
-       }
-       public void setSuccess(boolean success) {
-               this.success = success;
-       }
-       public String getException() {
-               return exception;
-       }
-       public void setException(String exception) {
-               this.exception = exception;
-       }
+@XmlType(propOrder = {
+                      "success", "exception", "encodedRowkeys"
+    })
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GenericCreateAPIResponseEntity {
+    private boolean success;
+    private String exception;
+    private List<String> encodedRowkeys;
+
+    public List<String> getEncodedRowkeys() {
+        return encodedRowkeys;
+    }
+
+    public void setEncodedRowkeys(List<String> encodedRowkeys) {
+        this.encodedRowkeys = encodedRowkeys;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
 }
index 9c42ab2..71f27c2 100755 (executable)
@@ -23,36 +23,43 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-public class GenericEntityBatchReader implements EntityCreationListener{
-       private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
-       
-       private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
-       private StreamReader reader;
-       
-       public GenericEntityBatchReader(String serviceName, SearchCondition condition) throws InstantiationException, IllegalAccessException{
-               reader = new GenericEntityStreamReader(serviceName, condition);
-               reader.register(this);
-       }
-       
-       public GenericEntityBatchReader(StreamReader reader) throws InstantiationException, IllegalAccessException{
-               this.reader = reader;
-               reader.register(this);
-       }
-       
-       public long getLastTimestamp() {
-               return reader.getLastTimestamp();
-       }
-       public long getFirstTimestamp(){ return reader.getFirstTimestamp();}
-       
-       @Override
-       public void entityCreated(TaggedLogAPIEntity entity){
-               entities.add(entity);
-       }
-       
-       @SuppressWarnings("unchecked")
-       public <T> List<T> read() throws Exception{
-               if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode");
-               reader.readAsStream();
-               return (List<T>)entities;
-       }
+public class GenericEntityBatchReader implements EntityCreationListener {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
+
+    private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+    private StreamReader reader;
+
+    public GenericEntityBatchReader(String serviceName, SearchCondition condition)
+        throws InstantiationException, IllegalAccessException {
+        reader = new GenericEntityStreamReader(serviceName, condition);
+        reader.register(this);
+    }
+
+    public GenericEntityBatchReader(StreamReader reader)
+        throws InstantiationException, IllegalAccessException {
+        this.reader = reader;
+        reader.register(this);
+    }
+
+    public long getLastTimestamp() {
+        return reader.getLastTimestamp();
+    }
+
+    public long getFirstTimestamp() {
+        return reader.getFirstTimestamp();
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) {
+        entities.add(entity);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> List<T> read() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Start reading as batch mode");
+        }
+        reader.readAsStream();
+        return (List<T>)entities;
+    }
 }
index a9e03b3..6683c28 100755 (executable)
@@ -36,7 +36,8 @@ public class GenericEntityScanStreamReader extends StreamReader {
     private long lastTimestamp = 0;
     private long firstTimestamp = 0;
 
-    public GenericEntityScanStreamReader(String serviceName, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
+    public GenericEntityScanStreamReader(String serviceName, SearchCondition condition, String prefix)
+        throws InstantiationException, IllegalAccessException {
         this.prefix = prefix;
         checkNotNull(serviceName, "serviceName");
         this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
@@ -44,7 +45,8 @@ public class GenericEntityScanStreamReader extends StreamReader {
         this.condition = condition;
     }
 
-    public GenericEntityScanStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
+    public GenericEntityScanStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix)
+        throws InstantiationException, IllegalAccessException {
         this.prefix = prefix;
         checkNotNull(entityDef, "entityDef");
         this.entityDef = entityDef;
@@ -52,12 +54,13 @@ public class GenericEntityScanStreamReader extends StreamReader {
         this.condition = condition;
     }
 
+    @Override
     public long getLastTimestamp() {
         return lastTimestamp;
     }
 
-    private void checkNotNull(Object o, String message){
-        if(o == null){
+    private void checkNotNull(Object o, String message) {
+        if (o == null) {
             throw new IllegalArgumentException(message + " should not be null");
         }
     }
@@ -71,30 +74,33 @@ public class GenericEntityScanStreamReader extends StreamReader {
     }
 
     @Override
-    public void readAsStream() throws Exception{
+    public void readAsStream() throws Exception {
         Date start = null;
         Date end = null;
         // shortcut to avoid read when pageSize=0
-        if(condition.getPageSize() <= 0){
+        if (condition.getPageSize() <= 0) {
             return; // return nothing
         }
         // Process the time range if needed
-        if(entityDef.isTimeSeries()){
+        if (entityDef.isTimeSeries()) {
             start = new Date(condition.getStartTime());
             end = new Date(condition.getEndTime());
-        }else{
-            //start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME);
-            //end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME);
+        } else {
+            // start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME);
+            // end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME);
             start = new Date(EntityConstants.FIXED_READ_START_TIMESTAMP);
             end = new Date(EntityConstants.FIXED_READ_END_TIMESTAMP);
         }
         byte[][] outputQualifiers = null;
-        if(!condition.isOutputAll()) {
+        if (!condition.isOutputAll()) {
             // Generate the output qualifiers
-            outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields());
+            outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef,
+                                                                          condition.getOutputFields());
         }
-        HBaseLogReader2 reader = new HBaseLogReader2(entityDef, condition.getPartitionValues(), start, end, condition.getFilter(), condition.getStartRowkey(), outputQualifiers, this.prefix);
-        try{
+        HBaseLogReader2 reader = new HBaseLogReader2(entityDef, condition.getPartitionValues(), start, end,
+                                                     condition.getFilter(), condition.getStartRowkey(),
+                                                     outputQualifiers, this.prefix);
+        try {
             reader.open();
             InternalLog log;
             int count = 0;
@@ -103,23 +109,24 @@ public class GenericEntityScanStreamReader extends StreamReader {
                 if (lastTimestamp < entity.getTimestamp()) {
                     lastTimestamp = entity.getTimestamp();
                 }
-                if(firstTimestamp > entity.getTimestamp() || firstTimestamp == 0){
+                if (firstTimestamp > entity.getTimestamp() || firstTimestamp == 0) {
                     firstTimestamp = entity.getTimestamp();
                 }
 
                 entity.setSerializeVerbose(condition.isOutputVerbose());
                 entity.setSerializeAlias(condition.getOutputAlias());
 
-                for(EntityCreationListener l : _listeners){
+                for (EntityCreationListener l : listeners) {
                     l.entityCreated(entity);
                 }
-                if(++count == condition.getPageSize())
+                if (++count == condition.getPageSize()) {
                     break;
+                }
             }
-        }catch(IOException ioe){
+        } catch (IOException ioe) {
             LOG.error("Fail reading log", ioe);
             throw ioe;
-        }finally{
+        } finally {
             reader.close();
         }
     }
index c3d916e..6dfe27d 100755 (executable)
@@ -29,97 +29,103 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class GenericEntityStreamReader extends StreamReader {
-       private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReader.class);
-       
-       private EntityDefinition entityDef;
-       private SearchCondition condition;
-       private String prefix;
-       private StreamReader readerAfterPlan;
-
-       public GenericEntityStreamReader(String serviceName, SearchCondition condition) throws InstantiationException, IllegalAccessException{
-               this(serviceName, condition, null);
-       }
-
-       public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition) throws InstantiationException, IllegalAccessException{
-               this(entityDef, condition, entityDef.getPrefix());
-       }
-       
-       public GenericEntityStreamReader(String serviceName, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
-               this.prefix = prefix;
-               checkNotNull(serviceName, "serviceName");
-               this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
-               checkNotNull(entityDef, "EntityDefinition");
-               this.condition = condition;
-               this.readerAfterPlan = selectQueryReader();
-       }
-
-       public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
-               this.prefix = prefix;
-               checkNotNull(entityDef, "entityDef");
-               this.entityDef = entityDef;
-               checkNotNull(entityDef, "EntityDefinition");
-               this.condition = condition;
-               this.readerAfterPlan = selectQueryReader();
-       }
-
-       private void checkNotNull(Object o, String message){
-               if(o == null){
-                       throw new IllegalArgumentException(message + " should not be null");
-               }
-       }
-       
-       public EntityDefinition getEntityDefinition() {
-               return entityDef;
-       }
-       
-       public SearchCondition getSearchCondition() {
-               return condition;
-       }
-       
-       @Override
-       public void readAsStream() throws Exception{
-               readerAfterPlan._listeners.addAll(this._listeners);
-               readerAfterPlan.readAsStream();
-       }
-       
-       private StreamReader selectQueryReader() throws InstantiationException, IllegalAccessException {
-               final ORExpression query = condition.getQueryExpression();
-               IndexDefinition[] indexDefs = entityDef.getIndexes();
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReader.class);
+
+    private EntityDefinition entityDef;
+    private SearchCondition condition;
+    private String prefix;
+    private StreamReader readerAfterPlan;
+
+    public GenericEntityStreamReader(String serviceName, SearchCondition condition)
+        throws InstantiationException, IllegalAccessException {
+        this(serviceName, condition, null);
+    }
+
+    public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition)
+        throws InstantiationException, IllegalAccessException {
+        this(entityDef, condition, entityDef.getPrefix());
+    }
+
+    public GenericEntityStreamReader(String serviceName, SearchCondition condition, String prefix)
+        throws InstantiationException, IllegalAccessException {
+        this.prefix = prefix;
+        checkNotNull(serviceName, "serviceName");
+        this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+        checkNotNull(entityDef, "EntityDefinition");
+        this.condition = condition;
+        this.readerAfterPlan = selectQueryReader();
+    }
+
+    public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix)
+        throws InstantiationException, IllegalAccessException {
+        this.prefix = prefix;
+        checkNotNull(entityDef, "entityDef");
+        this.entityDef = entityDef;
+        checkNotNull(entityDef, "EntityDefinition");
+        this.condition = condition;
+        this.readerAfterPlan = selectQueryReader();
+    }
+
+    private void checkNotNull(Object o, String message) {
+        if (o == null) {
+            throw new IllegalArgumentException(message + " should not be null");
+        }
+    }
+
+    public EntityDefinition getEntityDefinition() {
+        return entityDef;
+    }
+
+    public SearchCondition getSearchCondition() {
+        return condition;
+    }
+
+    @Override
+    public void readAsStream() throws Exception {
+        readerAfterPlan.listeners.addAll(this.listeners);
+        readerAfterPlan.readAsStream();
+    }
+
+    private StreamReader selectQueryReader() throws InstantiationException, IllegalAccessException {
+        final ORExpression query = condition.getQueryExpression();
+        IndexDefinition[] indexDefs = entityDef.getIndexes();
 
         // Index just works with query condition
-               if (indexDefs != null && condition.getQueryExpression()!=null) {
-                       List<byte[]> rowkeys = new ArrayList<>();
-                       for (IndexDefinition index : indexDefs) {
-                               // Check unique index first
-                               if (index.isUnique()) {
-                                       final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
-                                       if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
-                                               LOG.info("Selectd query unique index " + index.getIndexName() + " for query: " + condition.getQueryExpression());
-                                               return new UniqueIndexStreamReader(index, condition, rowkeys);
-                                       }
-                               }
-                       }
-                       for (IndexDefinition index : indexDefs) {
-                               // Check non-clustered index
-                               if (!index.isUnique()) {
-                                       final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
-                                       if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
-                                               LOG.info("Selectd query non clustered index " + index.getIndexName() + " for query: " + condition.getQueryExpression().toString());
-                                               return new NonClusteredIndexStreamReader(index, condition, rowkeys);
-                                       }
-                               }
-                       }
-               }
-               return new GenericEntityScanStreamReader(entityDef, condition, this.prefix);
-       }
-
-       @Override
-       public long getLastTimestamp() {
-               return readerAfterPlan.getLastTimestamp();
-       }
-
-       @Override
-       public long getFirstTimestamp() {
-               return readerAfterPlan.getFirstTimestamp();
-       }
+        if (indexDefs != null && condition.getQueryExpression() != null) {
+            List<byte[]> rowkeys = new ArrayList<>();
+            for (IndexDefinition index : indexDefs) {
+                // Check unique index first
+                if (index.isUnique()) {
+                    final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
+                    if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+                        LOG.info("Selectd query unique index " + index.getIndexName() + " for query: "
+                                 + condition.getQueryExpression());
+                        return new UniqueIndexStreamReader(index, condition, rowkeys);
+                    }
+                }
+            }
+            for (IndexDefinition index : indexDefs) {
+                // Check non-clustered index
+                if (!index.isUnique()) {
+                    final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
+                    if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+                        LOG.info("Selectd query non clustered index " + index.getIndexName() + " for query: "
+                                 + condition.getQueryExpression().toString());
+                        return new NonClusteredIndexStreamReader(index, condition, rowkeys);
+                    }
+                }
+            }
+        }
+        return new GenericEntityScanStreamReader(entityDef, condition, this.prefix);
+    }
+
+    @Override
+    public long getLastTimestamp() {
+        return readerAfterPlan.getLastTimestamp();
+    }
+
+    @Override
+    public long getFirstTimestamp() {
+        return readerAfterPlan.getFirstTimestamp();
+    }
 }
index bf72a36..15bdd20 100755 (executable)
@@ -31,121 +31,124 @@ import org.apache.eagle.common.DateTimeUtil;
 
 /**
  * multi-threading stream readers which only applies to time-series entity where we split the query into
- * different time range
- * 
- * When this class is used together with list query or aggregate query, be aware that the query's behavior could
- * be changed for example pageSize does not work well, output sequence is not determined
+ * different time range When this class is used together with list query or aggregate query, be aware that the
+ * query's behavior could be changed for example pageSize does not work well, output sequence is not
+ * determined
  */
-public class GenericEntityStreamReaderMT extends StreamReader{
-       private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
-       private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>(); 
-       
-       public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads) throws Exception{
-               checkIsTimeSeries(serviceName);
-               checkNumThreads(numThreads);
-               long queryStartTime = condition.getStartTime();
-               long queryEndTime = condition.getEndTime();
-               long subStartTime = queryStartTime;
-               long subEndTime = 0;
-               long interval = (queryEndTime-queryStartTime) / numThreads;
-               for(int i=0; i<numThreads; i++){
-                       // split search condition by time range
-                       subStartTime = queryStartTime + i*interval;
-                       if(i == numThreads-1){
-                               subEndTime = queryEndTime;
-                       }else{
-                               subEndTime = subStartTime + interval;
-                       }
-                       //String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
-                       //String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
-                       SearchCondition sc = new SearchCondition(condition);
-                       sc.setStartTime(subStartTime);
-                       sc.setEndTime(subEndTime);
-                       GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc);
-                       readers.add(reader);
-               }
-       }
-       
-       private void checkIsTimeSeries(String serviceName) throws Exception{
-               EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
-               if(!ed.isTimeSeries()){
-                       throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table");
-               }
-       }
-       
-       private void checkNumThreads(int numThreads){
-               if(numThreads <= 0){
-                       throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1");
-               }
-       }
-       
-       /**
-        * default to 2 threads
-        * @param serviceName
-        * @param condition
-        */
-       public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception{
-               this(serviceName, condition, 2);
-       }
-       
-       @Override
-       public void readAsStream() throws Exception{
-               // populate listeners to all readers
-               for(EntityCreationListener l : _listeners){
-                       for(GenericEntityStreamReader r : readers){
-                               r.register(l);
-                       }
-               }
-
-               List<Future<Void>> futures = new ArrayList<Future<Void>>();
-               for(GenericEntityStreamReader r : readers){
-                       SingleReader reader = new SingleReader(r);
-                       Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader);
-                       futures.add(readFuture);
-               }
-               
-               // join threads and check exceptions
-               for(Future<Void> future : futures){
-                       try{
-                               future.get();
-                       }catch(Exception ex){
-                               LOG.error("Error in read", ex);
-                               throw ex;
-                       }
-               }
-       }
-       
-       private static class SingleReader implements Callable<Void>{
-               private GenericEntityStreamReader reader;
-               public SingleReader(GenericEntityStreamReader reader){
-                       this.reader = reader;
-               }
-               @Override
-               public Void call() throws Exception{
-                       reader.readAsStream();
-                       return null;
-               }
-       }
-
-       @Override
-       public long getLastTimestamp() {
-               long lastTimestamp = 0;
-               for (GenericEntityStreamReader reader : readers) {
-                       if (lastTimestamp < reader.getLastTimestamp()) {
-                               lastTimestamp = reader.getLastTimestamp();
-                       }
-               }
-               return lastTimestamp;
-       }
-
-       @Override
-       public long getFirstTimestamp() {
-               long firstTimestamp = 0;
-               for (GenericEntityStreamReader reader : readers) {
-                       if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) {
-                               firstTimestamp = reader.getLastTimestamp();
-                       }
-               }
-               return firstTimestamp;
-       }
+public class GenericEntityStreamReaderMT extends StreamReader {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
+    private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>();
+
+    public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads)
+        throws Exception {
+        checkIsTimeSeries(serviceName);
+        checkNumThreads(numThreads);
+        long queryStartTime = condition.getStartTime();
+        long queryEndTime = condition.getEndTime();
+        long subStartTime = queryStartTime;
+        long subEndTime = 0;
+        long interval = (queryEndTime - queryStartTime) / numThreads;
+        for (int i = 0; i < numThreads; i++) {
+            // split search condition by time range
+            subStartTime = queryStartTime + i * interval;
+            if (i == numThreads - 1) {
+                subEndTime = queryEndTime;
+            } else {
+                subEndTime = subStartTime + interval;
+            }
+            // String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
+            // String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
+            SearchCondition sc = new SearchCondition(condition);
+            sc.setStartTime(subStartTime);
+            sc.setEndTime(subEndTime);
+            GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc);
+            readers.add(reader);
+        }
+    }
+
+    private void checkIsTimeSeries(String serviceName) throws Exception {
+        EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+        if (!ed.isTimeSeries()) {
+            throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table");
+        }
+    }
+
+    private void checkNumThreads(int numThreads) {
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1");
+        }
+    }
+
+    /**
+     * default to 2 threads
+     *
+     * @param serviceName
+     * @param condition
+     */
+    public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception {
+        this(serviceName, condition, 2);
+    }
+
+    @Override
+    public void readAsStream() throws Exception {
+        // populate listeners to all readers
+        for (EntityCreationListener l : listeners) {
+            for (GenericEntityStreamReader r : readers) {
+                r.register(l);
+            }
+        }
+
+        List<Future<Void>> futures = new ArrayList<Future<Void>>();
+        for (GenericEntityStreamReader r : readers) {
+            SingleReader reader = new SingleReader(r);
+            Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader);
+            futures.add(readFuture);
+        }
+
+        // join threads and check exceptions
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (Exception ex) {
+                LOG.error("Error in read", ex);
+                throw ex;
+            }
+        }
+    }
+
+    private static class SingleReader implements Callable<Void> {
+        private GenericEntityStreamReader reader;
+
+        public SingleReader(GenericEntityStreamReader reader) {
+            this.reader = reader;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            reader.readAsStream();
+            return null;
+        }
+    }
+
+    @Override
+    public long getLastTimestamp() {
+        long lastTimestamp = 0;
+        for (GenericEntityStreamReader reader : readers) {
+            if (lastTimestamp < reader.getLastTimestamp()) {
+                lastTimestamp = reader.getLastTimestamp();
+            }
+        }
+        return lastTimestamp;
+    }
+
+    @Override
+    public long getFirstTimestamp() {
+        long firstTimestamp = 0;
+        for (GenericEntityStreamReader reader : readers) {
+            if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) {
+                firstTimestamp = reader.getLastTimestamp();
+            }
+        }
+        return firstTimestamp;
+    }
 }
index 5c8b12d..926fcba 100755 (executable)
@@ -27,52 +27,53 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class GenericEntityWriter {
-       private static final Logger LOG = LoggerFactory.getLogger(GenericEntityWriter.class);
-       private EntityDefinition entityDef;
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityWriter.class);
+    private EntityDefinition entityDef;
 
-       public GenericEntityWriter(String serviceName) throws InstantiationException, IllegalAccessException{
-               this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
-               checkNotNull(entityDef, "serviceName");
-       }
+    public GenericEntityWriter(String serviceName) throws InstantiationException, IllegalAccessException {
+        this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+        checkNotNull(entityDef, "serviceName");
+    }
 
-       public GenericEntityWriter(EntityDefinition entityDef) throws InstantiationException, IllegalAccessException{
-               this.entityDef = entityDef;
-               checkNotNull(entityDef, "serviceName");
-       }
-       
-       private void checkNotNull(Object o, String message) {
-               if(o == null){
-                       throw new IllegalArgumentException(message + " should not be null");
-               }
-       }
+    public GenericEntityWriter(EntityDefinition entityDef)
+        throws InstantiationException, IllegalAccessException {
+        this.entityDef = entityDef;
+        checkNotNull(entityDef, "serviceName");
+    }
 
-       /**
-        * @param entities
-        * @return row keys
-        * @throws Exception
-        */
-       public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws Exception{
-               HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily());
-               List<String> rowkeys = new ArrayList<String>(entities.size());
-               List<InternalLog> logs = new ArrayList<InternalLog>(entities.size());
-               
-               try{
-                       writer.open();
-                       for(TaggedLogAPIEntity entity : entities){
-                               final InternalLog entityLog = HBaseInternalLogHelper.convertToInternalLog(entity, entityDef);
-                               logs.add(entityLog);
-                       }
-                       List<byte[]> bRowkeys  = writer.write(logs);
-                       for (byte[] rowkey : bRowkeys) {
-                               rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
-                       }
+    private void checkNotNull(Object o, String message) {
+        if (o == null) {
+            throw new IllegalArgumentException(message + " should not be null");
+        }
+    }
 
-               }catch(Exception ex){
-                       LOG.error("fail writing tagged log", ex);
-                       throw ex;
-               }finally{
-                       writer.close();
-               }
-               return rowkeys;
-       }
+    /**
+     * @param entities
+     * @return row keys
+     * @throws Exception
+     */
+    public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws Exception {
+        HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily());
+        List<String> rowkeys = new ArrayList<String>(entities.size());
+        List<InternalLog> logs = new ArrayList<InternalLog>(entities.size());
+
+        try {
+            writer.open();
+            for (TaggedLogAPIEntity entity : entities) {
+                final InternalLog entityLog = HBaseInternalLogHelper.convertToInternalLog(entity, entityDef);
+                logs.add(entityLog);
+            }
+            List<byte[]> bRowkeys = writer.write(logs);
+            for (byte[] rowkey : bRowkeys) {
+                rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
+            }
+
+        } catch (Exception ex) {
+            LOG.error("fail writing tagged log", ex);
+            throw ex;
+        } finally {
+            writer.close();
+        }
+        return rowkeys;
+    }
 }
index 9f6937b..56cd453 100755 (executable)
@@ -21,33 +21,35 @@ import org.apache.eagle.log.entity.meta.*;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 /**
- * GenericMetricEntity should use prefix field which is extended from TaggedLogAPIEntity as metric name
- * metric name is used to partition the metric tables
+ * GenericMetricEntity should use prefix field which is extended from TaggedLogAPIEntity as metric name metric
+ * name is used to partition the metric tables
  */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("eagle_metric")
 @ColumnFamily("f")
 @Prefix(GenericMetricEntity.GENERIC_METRIC_PREFIX_PLACE_HOLDER)
 @Service(GenericMetricEntity.GENERIC_METRIC_SERVICE)
 @TimeSeries(true)
-@Metric(interval=60000)
+@Metric(interval = 60000)
 @ServicePath(path = "/metric")
 // TODO:
-@Tags({"site","application","policyId","alertExecutorId", "streamName","source","partitionSeq"})
+@Tags({
+       "site", "application", "policyId", "alertExecutorId", "streamName", "source", "partitionSeq"
+    })
 public class GenericMetricEntity extends TaggedLogAPIEntity {
-       public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
-       public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = "GENERIC_METRIC_PREFIX_PLACEHODLER";
-       public static final String VALUE_FIELD ="value";
+    public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
+    public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = "GENERIC_METRIC_PREFIX_PLACEHODLER";
+    public static final String VALUE_FIELD = "value";
 
-       @Column("a")
-       private double[] value;
+    @Column("a")
+    private double[] value;
 
-       public double[] getValue() {
-               return value;
-       }
+    public double[] getValue() {
+        return value;
+    }
 
-       public void setValue(double[] value) {
-               this.value = value;
-               pcs.firePropertyChange("value", null, null);
-       }
-}
\ No newline at end of file
+    public void setValue(double[] value) {
+        this.value = value;
+        pcs.firePropertyChange("value", null, null);
+    }
+}
index 84b02ae..bc99a81 100755 (executable)
@@ -23,32 +23,37 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-public class GenericMetricEntityBatchReader  implements EntityCreationListener{
-       private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
-       
-       private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
-       private GenericEntityStreamReader reader;
-       
-       public GenericMetricEntityBatchReader(String metricName, SearchCondition condition) throws Exception{
-               reader = new GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition, metricName);
-       }
-       
-       public long getLastTimestamp() {
-               return reader.getLastTimestamp();
-       }
-       public long getFirstTimestamp() {
-               return reader.getFirstTimestamp();
-       }
-       @Override
-       public void entityCreated(TaggedLogAPIEntity entity){
-               entities.add(entity);
-       }
-       
-       @SuppressWarnings("unchecked")
-       public <T> List<T> read() throws Exception{
-               if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode");
-               reader.register(this);
-               reader.readAsStream();
-               return (List<T>)entities;
-       }
+public class GenericMetricEntityBatchReader implements EntityCreationListener {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
+
+    private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+    private GenericEntityStreamReader reader;
+
+    public GenericMetricEntityBatchReader(String metricName, SearchCondition condition) throws Exception {
+        reader = new GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition,
+                                               metricName);
+    }
+
+    public long getLastTimestamp() {
+        return reader.getLastTimestamp();
+    }
+
+    public long getFirstTimestamp() {
+        return reader.getFirstTimestamp();
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) {
+        entities.add(entity);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> List<T> read() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Start reading as batch mode");
+        }
+        reader.register(this);
+        reader.readAsStream();
+        return (List<T>)entities;
+    }
 }
index 1cf3905..216022f 100755 (executable)
@@ -25,74 +25,79 @@ import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
 
-public class GenericMetricEntityDecompactionStreamReader extends StreamReader implements EntityCreationListener{
-       @SuppressWarnings("unused")
-       private static final Logger LOG = LoggerFactory.getLogger(GenericMetricEntityDecompactionStreamReader.class);
-       private GenericEntityStreamReader reader;
-       private EntityDefinition ed;
-       private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE;
-       private long start;
-       private long end;
-       private GenericMetricShadowEntity single = new GenericMetricShadowEntity();
-       
-       /**
-        * it makes sense that serviceName should not be provided while metric name should be provided as prefix
-        * @param metricName
-        * @param condition
-        * @throws InstantiationException
-        * @throws IllegalAccessException
-        * @throws ParseException
-        */
-       public GenericMetricEntityDecompactionStreamReader(String metricName, SearchCondition condition) throws InstantiationException, IllegalAccessException, ParseException{
-               ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
-               checkIsMetric(ed);
-               reader = new GenericEntityStreamReader(serviceName, condition, metricName);
-               start = condition.getStartTime();
-               end = condition.getEndTime();
-       }
-       
-       private void checkIsMetric(EntityDefinition ed){
-               if(ed.getMetricDefinition() == null)
-                       throw new IllegalArgumentException("Only metric entity comes here");
-       }
-       
-       @Override
-       public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
-               GenericMetricEntity e = (GenericMetricEntity)entity;
-               double[] value = e.getValue();
-               if(value != null) {
-                       int count =value.length;
-                       @SuppressWarnings("unused")
-                       Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
-                       for (int i = 0; i < count; i++) {
-                               long ts = entity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
-                               // exclude those entity which is not within the time range in search condition. [start, end)
-                               if (ts < start || ts >= end) {
-                                       continue;
-                               }
-                               single.setTimestamp(ts);
-                               single.setTags(entity.getTags());
-                               single.setValue(e.getValue()[i]);
-                               for (EntityCreationListener l : _listeners) {
-                                       l.entityCreated(single);
-                               }
-                       }
-               }
-       }
-       
-       @Override
-       public void readAsStream() throws Exception{
-               reader.register(this);
-               reader.readAsStream();
-       }
+public class GenericMetricEntityDecompactionStreamReader extends StreamReader
+    implements EntityCreationListener {
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory
+        .getLogger(GenericMetricEntityDecompactionStreamReader.class);
+    private GenericEntityStreamReader reader;
+    private EntityDefinition ed;
+    private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE;
+    private long start;
+    private long end;
+    private GenericMetricShadowEntity single = new GenericMetricShadowEntity();
 
-       @Override
-       public long getLastTimestamp() {
-               return reader.getLastTimestamp();
-       }
+    /**
+     * it makes sense that serviceName should not be provided while metric name should be provided as prefix
+     *
+     * @param metricName
+     * @param condition
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ParseException
+     */
+    public GenericMetricEntityDecompactionStreamReader(String metricName, SearchCondition condition)
+        throws InstantiationException, IllegalAccessException, ParseException {
+        ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+        checkIsMetric(ed);
+        reader = new GenericEntityStreamReader(serviceName, condition, metricName);
+        start = condition.getStartTime();
+        end = condition.getEndTime();
+    }
 
-       @Override
-       public long getFirstTimestamp() {
-               return reader.getFirstTimestamp();
-       }
-}
\ No newline at end of file
+    private void checkIsMetric(EntityDefinition ed) {
+        if (ed.getMetricDefinition() == null) {
+            throw new IllegalArgumentException("Only metric entity comes here");
+        }
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) throws Exception {
+        GenericMetricEntity e = (GenericMetricEntity)entity;
+        double[] value = e.getValue();
+        if (value != null) {
+            int count = value.length;
+            @SuppressWarnings("unused")
+            Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
+            for (int i = 0; i < count; i++) {
+                long ts = entity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
+                // exclude those entity which is not within the time range in search condition. [start, end)
+                if (ts < start || ts >= end) {
+                    continue;
+                }
+                single.setTimestamp(ts);
+                single.setTags(entity.getTags());
+                single.setValue(e.getValue()[i]);
+                for (EntityCreationListener l : listeners) {
+                    l.entityCreated(single);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void readAsStream() throws Exception {
+        reader.register(this);
+        reader.readAsStream();
+    }
+
+    @Override
+    public long getLastTimestamp() {
+        return reader.getLastTimestamp();
+    }
+
+    @Override
+    public long getFirstTimestamp() {
+        return reader.getFirstTimestamp();
+    }
+}
index acd1290..8ead7cd 100644 (file)
@@ -22,13 +22,13 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
  * just a shadow class to avoid dynamically create the class and instantiate using reflection
  */
 public class GenericMetricShadowEntity extends TaggedLogAPIEntity {
-       private double value;
+    private double value;
 
-       public double getValue() {
-               return value;
-       }
+    public double getValue() {
+        return value;
+    }
 
-       public void setValue(double value) {
-               this.value = value;
-       }
+    public void setValue(double value) {
+        this.value = value;
+    }
 }
index 6869c7c..97f538c 100644 (file)
@@ -35,24 +35,27 @@ import java.util.Map;
  */
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
-@XmlType(propOrder = {"success","exception","meta","type","obj"})
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@XmlType(propOrder = {
+                      "success", "exception", "meta", "type", "obj"
+    })
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonDeserialize(using = GenericServiceAPIResponseEntityDeserializer.class)
-@JsonIgnoreProperties(ignoreUnknown=true)
-public class GenericServiceAPIResponseEntity<T>{
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GenericServiceAPIResponseEntity<T> {
     /**
      * Please use primitive type of value in meta as possible
      */
-    private Map<String,Object> meta;
-       private boolean success;
-       private String exception;
+    private Map<String, Object> meta;
+    private boolean success;
+    private String exception;
     private List<T> obj;
     private Class<T> type;
 
-    public GenericServiceAPIResponseEntity(){
+    public GenericServiceAPIResponseEntity() {
         // default constructor
     }
-    public GenericServiceAPIResponseEntity(Class<T> type){
+
+    public GenericServiceAPIResponseEntity(Class<T> type) {
         this.setType(type);
     }
 
@@ -72,7 +75,7 @@ public class GenericServiceAPIResponseEntity<T>{
         this.obj = obj;
     }
 
-    public void setObj(List<T> obj,Class<T> type) {
+    public void setObj(List<T> obj, Class<T> type) {
         this.setObj(obj);
         this.setType(type);
     }
@@ -85,10 +88,10 @@ public class GenericServiceAPIResponseEntity<T>{
      * Set the first object's class as type
      */
     @SuppressWarnings("unused")
-    public void setTypeByObj(){
-        for(T t:this.obj){
-            if(this.type == null && t!=null){
-                this.type = (Class<T>) t.getClass();
+    public void setTypeByObj() {
+        for (T t : this.obj) {
+            if (this.type == null && t != null) {
+                this.type = (Class<T>)t.getClass();
             }
         }
     }
@@ -102,17 +105,19 @@ public class GenericServiceAPIResponseEntity<T>{
         this.type = type;
     }
 
-       public boolean isSuccess() {
-               return success;
-       }
-       public void setSuccess(boolean success) {
-               this.success = success;
-       }
-       public String getException() {
-               return exception;
-       }
-
-    public void setException(Exception exceptionObj){
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(Exception exceptionObj) {
         this.exception = EagleExceptionWrapper.wrap(exceptionObj);
     }
 }
index 836295b..8ccb43a 100644 (file)
@@ -30,57 +30,61 @@ import java.io.IOException;
 import java.util.*;
 
 /**
- * @since 3/18/15
+ * @since 3/18/15.
  */
-public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserializer<GenericServiceAPIResponseEntity> {
-    private final static String META_FIELD="meta";
-    private final static String SUCCESS_FIELD="success";
-    private final static String EXCEPTION_FIELD="exception";
-    private final static String OBJ_FIELD="obj";
-    private final static String TYPE_FIELD="type";
+public class GenericServiceAPIResponseEntityDeserializer
+    extends JsonDeserializer<GenericServiceAPIResponseEntity> {
+    private static final String META_FIELD = "meta";
+    private static final String SUCCESS_FIELD = "success";
+    private static final String EXCEPTION_FIELD = "exception";
+    private static final String OBJ_FIELD = "obj";
+    private static final String TYPE_FIELD = "type";
 
     @Override
-    public GenericServiceAPIResponseEntity deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+    public GenericServiceAPIResponseEntity deserialize(JsonParser jp, DeserializationContext ctxt)
+        throws IOException, JsonProcessingException {
         GenericServiceAPIResponseEntity entity = new GenericServiceAPIResponseEntity();
         ObjectCodec objectCodec = jp.getCodec();
 
         JsonNode rootNode = jp.getCodec().readTree(jp);
-        if(rootNode.isObject()){
-            Iterator<Map.Entry<String,JsonNode>> fields = rootNode.fields();
+        if (rootNode.isObject()) {
+            Iterator<Map.Entry<String, JsonNode>> fields = rootNode.fields();
             JsonNode objNode = null;
-            while(fields.hasNext()){
-                Map.Entry<String,JsonNode> field = fields.next();
-                if (META_FIELD.equals(field.getKey()) && field.getValue() != null)
+            while (fields.hasNext()) {
+                Map.Entry<String, JsonNode> field = fields.next();
+                if (META_FIELD.equals(field.getKey()) && field.getValue() != null) {
                     entity.setMeta(objectCodec.readValue(field.getValue().traverse(), Map.class));
-                else if(SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null){
+                } else if (SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null) {
                     entity.setSuccess(field.getValue().booleanValue());
-                }else if(EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null){
+                } else if (EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null) {
                     entity.setException(new Exception(field.getValue().textValue()));
-                }else if(TYPE_FIELD.endsWith(field.getKey())  && field.getValue() != null){
-                    Preconditions.checkNotNull(field.getValue().textValue(),"Response type class is null");
+                } else if (TYPE_FIELD.endsWith(field.getKey()) && field.getValue() != null) {
+                    Preconditions.checkNotNull(field.getValue().textValue(), "Response type class is null");
                     try {
                         entity.setType(Class.forName(field.getValue().textValue()));
                     } catch (ClassNotFoundException e) {
                         throw new IOException(e);
                     }
-                }else if(OBJ_FIELD.equals(field.getKey()) && field.getValue() != null){
+                } else if (OBJ_FIELD.equals(field.getKey()) && field.getValue() != null) {
                     objNode = field.getValue();
                 }
             }
 
-            if(objNode!=null) {
-                JavaType collectionType=null;
+            if (objNode != null) {
+                JavaType collectionType = null;
                 if (entity.getType() != null) {
-                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, entity.getType());
-                }else{
-                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, Map.class);
+                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class,
+                                                                                           entity.getType());
+                } else {
+                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class,
+                                                                                           Map.class);
                 }
                 List obj = objectCodec.readValue(objNode.traverse(), collectionType);
                 entity.setObj(obj);
             }
-        }else{
+        } else {
             throw new IOException("root node is not object");
         }
         return entity;
     }
-}
\ No newline at end of file
+}
index 7a38033..32f382b 100755 (executable)
@@ -30,216 +30,232 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 
 public class HBaseInternalLogHelper {
-       private final static Logger LOG  = LoggerFactory.getLogger(HBaseInternalLogHelper.class);
-
-       private static final EntitySerDeserializer ENTITY_SERDESER = new EntitySerDeserializer();
-
-       /**
-        *
-        * @param ed
-        * @param r
-        * @param qualifiers if null, return all qualifiers defined in ed
-        * @return
-        */
-       public static InternalLog parse(EntityDefinition ed, Result r, byte[][] qualifiers) {
-               final byte[] row = r.getRow();
-               // skip the first 4 bytes : prefix
-               final int offset = (ed.getPartitions() == null) ? (4) : (4 + ed.getPartitions().length * 4);
-               long timestamp = ByteUtil.bytesToLong(row, offset);
-               // reverse timestamp
-               timestamp = Long.MAX_VALUE - timestamp;
-               final byte[] family = ed.getColumnFamily().getBytes();
-               final Map<String, byte[]> allQualifierValues = new HashMap<String, byte[]>();
-
-               if (qualifiers != null) {
-                       int count = qualifiers.length;
-                       final byte[][] values = new byte[count][];
-                       for (int i = 0; i < count; i++) {
-                               // TODO if returned value is null, it means no this column for this row, so why set null to the object?
-                               values[i] = r.getValue(family, qualifiers[i]);
-                               allQualifierValues.put(new String(qualifiers[i]), values[i]);
-                       }
-               }else{
-                       // return all qualifiers
-                       for(KeyValue kv:r.list()){
-                               byte[] qualifier = kv.getQualifier();
-                               byte[] value = kv.getValue();
-                               allQualifierValues.put(new String(qualifier),value);
-                       }
-               }
-               final InternalLog log = buildObject(ed, row, timestamp, allQualifierValues);
-               return log;
-       }
-
-       /**
-        *
-        * @param ed
-        * @param row
-        * @param timestamp
-        * @param allQualifierValues <code>Map &lt; Qualifier name (not display name),Value in bytes array &gt;</code>
-        * @return
-        */
-       public static InternalLog buildObject(EntityDefinition ed, byte[] row, long timestamp, Map<String, byte[]> allQualifierValues) {
-               InternalLog log = new InternalLog();
-               String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row);
-               log.setEncodedRowkey(myRow);
-               log.setPrefix(ed.getPrefix());
-               log.setTimestamp(timestamp);
-
-               Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>();
-               Map<String, String> logTags = new HashMap<String, String>();
-               Map<String, Object> extra = null;
-
-               Map<String,Double> doubleMap = null;
-               // handle with metric
-               boolean isMetricEntity = GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService());
-               double[] metricValueArray = null;
-
-               for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) {
-                       if (ed.isTag(entry.getKey())) {
-                               if (entry.getValue() != null) {
-                                       logTags.put(entry.getKey(), new String(entry.getValue()));
-                               }else if (TokenConstant.isExpression(entry.getKey())){
-                                       if(doubleMap == null) doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed);
-                                       // Caculate expression based fields
-                                       String expression = TokenConstant.parseExpressionContent(entry.getKey());
-                                       if (extra == null) extra = new HashMap<String, Object>();
-
-                                       // Evaluation expression as output based on entity
-                                       // -----------------------------------------------
-                                       // 1) Firstly, check whether is metric entity and expression requires value and also value is not number (i.e. double[])
-                                       // 2) Treat all required fields as double, if not number, then set result as NaN
-
-                                       try {
-                                               ExpressionParser parser = ExpressionParser.parse(expression);
-                                               boolean isRequiringValue = parser.getDependentFields().contains(GenericMetricEntity.VALUE_FIELD);
-
-                                               if(isMetricEntity && isRequiringValue && doubleMap.get(GenericMetricEntity.VALUE_FIELD)!=null
-                                                               && Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) // EntityQualifierUtils will convert non-number field into Double.NaN
-                                               {
-                                                       // if dependent fields require "value"
-                                                       // and value exists but value's type is double[] instead of double
-
-                                                       // handle with metric value array based expression
-                                                       // lazily extract metric value as double array if required
-                                                       if(metricValueArray == null){
-                                                               // if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){
-                                                               Qualifier qualifier = ed.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
-                                                               EntitySerDeser serDeser = qualifier.getSerDeser();
-                                                               if(serDeser instanceof DoubleArraySerDeser){
-                                                                       byte[] value = allQualifierValues.get(qualifier.getQualifierName());
-                                                                       if(value !=null ) metricValueArray = (double[]) serDeser.deserialize(value);
-                                                               }
-                                                               // }
-                                                       }
-
-                                                       if(metricValueArray!=null){
-                                                               double[] resultBucket = new double[metricValueArray.length];
-                                                               Map<String, Double> _doubleMap = new HashMap<String,Double>(doubleMap);
-                                                               _doubleMap.remove(entry.getKey());
-                                                               for(int i=0;i< resultBucket.length;i++) {
-                                                                       _doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]);
-                                                                       resultBucket[i]=  parser.eval(_doubleMap);
-                                                               }
-                                                               extra.put(expression,resultBucket);
-                                                       }else{
-                                                               LOG.warn("Failed convert metric value into double[] type which is required by expression: "+expression);
-                                                               // if require value in double[] is NaN
-                                                               double value = parser.eval(doubleMap);
-                                                               extra.put(expression, value);
-                                                       }
-                                               }else {
-                                                       double value = parser.eval(doubleMap);
-                                                       extra.put(expression, value);
-                                                       // LOG.info("DEBUG: "+entry.getKey()+" = "+ value);
-                                               }
-                                       } catch (Exception e) {
-                                               LOG.error("Failed to eval expression "+expression+", exception: "+e.getMessage(),e);
-                                       }
-                               }
-                       } else {
-                               logQualifierValues.put(entry.getKey(),entry.getValue());
-                       }
-               }
-               log.setQualifierValues(logQualifierValues);
-               log.setTags(logTags);
-               log.setExtraValues(extra);
-               return log;
-       }
-       
-       public static TaggedLogAPIEntity buildEntity(InternalLog log, EntityDefinition entityDef) throws Exception {
-               Map<String, byte[]> qualifierValues = log.getQualifierValues();
-               TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, entityDef);
-               if (entity.getTags() == null && log.getTags() != null) {
-                       entity.setTags(log.getTags());
-               }
-               entity.setExp(log.getExtraValues());
-               entity.setTimestamp(log.getTimestamp());
-               entity.setEncodedRowkey(log.getEncodedRowkey());
-               entity.setPrefix(log.getPrefix());
-               return entity;
-       }
-       
-       public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> logs, EntityDefinition entityDef) throws Exception {
-               final List<TaggedLogAPIEntity> result = new ArrayList<TaggedLogAPIEntity>(logs.size());
-               for (InternalLog log : logs) {
-                       result.add(buildEntity(log, entityDef));
-               }
-               return result;
-       }
-       
-       public static byte[][] getOutputQualifiers(EntityDefinition entityDef, List<String> outputFields) {
-               final byte[][] result = new byte[outputFields.size()][];
-               int index = 0;
-               for(String field : outputFields){
-                       // convert displayName to qualifierName
-                       Qualifier q = entityDef.getDisplayNameMap().get(field);
-                       if(q == null){ // for tag case
-                               result[index++] = field.getBytes();
-                       }else{ // for qualifier case
-                               result[index++] = q.getQualifierName().getBytes();
-                       }
-               }
-               return result;
-       }
-
-       public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, EntityDefinition entityDef) throws Exception {
-               final InternalLog log = new InternalLog();
-               final Map<String, String> inputTags = entity.getTags();
-               final Map<String, String> tags = new TreeMap<String, String>();
-               if(inputTags!=null) {
-                       for (Map.Entry<String, String> entry : inputTags.entrySet()) {
-                               tags.put(entry.getKey(), entry.getValue());
-                       }
-               }
-               log.setTags(tags);
-               if(entityDef.isTimeSeries()){
-                       log.setTimestamp(entity.getTimestamp());
-               }else{
-                       log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0
-               }
-               
-               // For Metric entity, prefix is populated along with entity instead of EntityDefinition
-               if(entity.getPrefix() != null && !entity.getPrefix().isEmpty()){
-                       log.setPrefix(entity.getPrefix());
-               }else{
-                       log.setPrefix(entityDef.getPrefix());
-               }
-               
-               log.setPartitions(entityDef.getPartitions());
-               EntitySerDeserializer des = new EntitySerDeserializer();
-               log.setQualifierValues(des.writeValue(entity, entityDef));
-               
-               final IndexDefinition[] indexDefs = entityDef.getIndexes();
-               if (indexDefs != null) {
-                       final List<byte[]> indexRowkeys = new ArrayList<byte[]>();
-                       for (int i = 0; i < indexDefs.length; ++i) {
-                               final IndexDefinition indexDef = indexDefs[i];
-                               final byte[] indexRowkey = indexDef.generateIndexRowkey(entity);
-                               indexRowkeys.add(indexRowkey);
-                       }
-                       log.setIndexRowkeys(indexRowkeys);
-               }
-               return log;
-       }
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseInternalLogHelper.class);
+
+    private static final EntitySerDeserializer ENTITY_SERDESER = new EntitySerDeserializer();
+
+    /**
+     * @param ed
+     * @param r
+     * @param qualifiers if null, return all qualifiers defined in ed
+     * @return
+     */
+    public static InternalLog parse(EntityDefinition ed, Result r, byte[][] qualifiers) {
+        final byte[] row = r.getRow();
+        // skip the first 4 bytes : prefix
+        final int offset = (ed.getPartitions() == null) ? (4) : (4 + ed.getPartitions().length * 4);
+        long timestamp = ByteUtil.bytesToLong(row, offset);
+        // reverse timestamp
+        timestamp = Long.MAX_VALUE - timestamp;
+        final byte[] family = ed.getColumnFamily().getBytes();
+        final Map<String, byte[]> allQualifierValues = new HashMap<String, byte[]>();
+
+        if (qualifiers != null) {
+            int count = qualifiers.length;
+            final byte[][] values = new byte[count][];
+            for (int i = 0; i < count; i++) {
+                // TODO if returned value is null, it means no this column for this row, so why set null to
+                // the object?
+                values[i] = r.getValue(family, qualifiers[i]);
+                allQualifierValues.put(new String(qualifiers[i]), values[i]);
+            }
+        } else {
+            // return all qualifiers
+            for (KeyValue kv : r.list()) {
+                byte[] qualifier = kv.getQualifier();
+                byte[] value = kv.getValue();
+                allQualifierValues.put(new String(qualifier), value);
+            }
+        }
+        final InternalLog log = buildObject(ed, row, timestamp, allQualifierValues);
+        return log;
+    }
+
+    /**
+     * @param ed
+     * @param row
+     * @param timestamp
+     * @param allQualifierValues
+     *            <code>Map &lt; Qualifier name (not display name),Value in bytes array &gt;</code>
+     * @return
+     */
+    public static InternalLog buildObject(EntityDefinition ed, byte[] row, long timestamp,
+                                          Map<String, byte[]> allQualifierValues) {
+        InternalLog log = new InternalLog();
+        String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row);
+        log.setEncodedRowkey(myRow);
+        log.setPrefix(ed.getPrefix());
+        log.setTimestamp(timestamp);
+
+        Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>();
+        Map<String, String> logTags = new HashMap<String, String>();
+        Map<String, Object> extra = null;
+
+        Map<String, Double> doubleMap = null;
+        // handle with metric
+        boolean isMetricEntity = GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService());
+        double[] metricValueArray = null;
+
+        for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) {
+            if (ed.isTag(entry.getKey())) {
+                if (entry.getValue() != null) {
+                    logTags.put(entry.getKey(), new String(entry.getValue()));
+                } else if (TokenConstant.isExpression(entry.getKey())) {
+                    if (doubleMap == null) {
+                        doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed);
+                    }
+                    // Caculate expression based fields
+                    String expression = TokenConstant.parseExpressionContent(entry.getKey());
+                    if (extra == null) {
+                        extra = new HashMap<String, Object>();
+                    }
+
+                    // Evaluation expression as output based on entity
+                    // -----------------------------------------------
+                    // 1) Firstly, check whether is metric entity and expression requires value and also value
+                    // is not number (i.e. double[])
+                    // 2) Treat all required fields as double, if not number, then set result as NaN
+
+                    try {
+                        ExpressionParser parser = ExpressionParser.parse(expression);
+                        boolean isRequiringValue = parser.getDependentFields()
+                            .contains(GenericMetricEntity.VALUE_FIELD);
+
+                        if (isMetricEntity && isRequiringValue
+                            && doubleMap.get(GenericMetricEntity.VALUE_FIELD) != null
+                            && Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) {
+                            // EntityQualifierUtils will convert non-number field into Double.NaN
+                            // if dependent fields require "value"
+                            // and value exists but value's type is double[] instead of double
+
+                            // handle with metric value array based expression
+                            // lazily extract metric value as double array if required
+                            if (metricValueArray == null) {
+                                // if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){
+                                Qualifier qualifier = ed.getDisplayNameMap()
+                                    .get(GenericMetricEntity.VALUE_FIELD);
+                                EntitySerDeser serDeser = qualifier.getSerDeser();
+                                if (serDeser instanceof DoubleArraySerDeser) {
+                                    byte[] value = allQualifierValues.get(qualifier.getQualifierName());
+                                    if (value != null) {
+                                        metricValueArray = (double[])serDeser.deserialize(value);
+                                    }
+                                }
+                                // }
+                            }
+
+                            if (metricValueArray != null) {
+                                double[] resultBucket = new double[metricValueArray.length];
+                                Map<String, Double> _doubleMap = new HashMap<String, Double>(doubleMap);
+                                _doubleMap.remove(entry.getKey());
+                                for (int i = 0; i < resultBucket.length; i++) {
+                                    _doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]);
+                                    resultBucket[i] = parser.eval(_doubleMap);
+                                }
+                                extra.put(expression, resultBucket);
+                            } else {
+                                LOG.warn("Failed convert metric value into double[] type which is required by expression: "
+                                         + expression);
+                                // if require value in double[] is NaN
+                                double value = parser.eval(doubleMap);
+                                extra.put(expression, value);
+                            }
+                        } else {
+                            double value = parser.eval(doubleMap);
+                            extra.put(expression, value);
+                            // LOG.info("DEBUG: "+entry.getKey()+" = "+ value);
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Failed to eval expression " + expression + ", exception: "
+                                  + e.getMessage(), e);
+                    }
+                }
+            } else {
+                logQualifierValues.put(entry.getKey(), entry.getValue());
+            }
+        }
+        log.setQualifierValues(logQualifierValues);
+        log.setTags(logTags);
+        log.setExtraValues(extra);
+        return log;
+    }
+
+    public static TaggedLogAPIEntity buildEntity(InternalLog log, EntityDefinition entityDef)
+        throws Exception {
+        Map<String, byte[]> qualifierValues = log.getQualifierValues();
+        TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, entityDef);
+        if (entity.getTags() == null && log.getTags() != null) {
+            entity.setTags(log.getTags());
+        }
+        entity.setExp(log.getExtraValues());
+        entity.setTimestamp(log.getTimestamp());
+        entity.setEncodedRowkey(log.getEncodedRowkey());
+        entity.setPrefix(log.getPrefix());
+        return entity;
+    }
+
+    public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> logs, EntityDefinition entityDef)
+        throws Exception {
+        final List<TaggedLogAPIEntity> result = new ArrayList<TaggedLogAPIEntity>(logs.size());
+        for (InternalLog log : logs) {
+            result.add(buildEntity(log, entityDef));
+        }
+        return result;
+    }
+
+    public static byte[][] getOutputQualifiers(EntityDefinition entityDef, List<String> outputFields) {
+        final byte[][] result = new byte[outputFields.size()][];
+        int index = 0;
+        for (String field : outputFields) {
+            // convert displayName to qualifierName
+            Qualifier q = entityDef.getDisplayNameMap().get(field);
+            if (q == null) { // for tag case
+                result[index++] = field.getBytes();
+            } else { // for qualifier case
+                result[index++] = q.getQualifierName().getBytes();
+            }
+        }
+        return result;
+    }
+
+    public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, EntityDefinition entityDef)
+        throws Exception {
+        final InternalLog log = new InternalLog();
+        final Map<String, String> inputTags = entity.getTags();
+        final Map<String, String> tags = new TreeMap<String, String>();
+        if (inputTags != null) {
+            for (Map.Entry<String, String> entry : inputTags.entrySet()) {
+                tags.put(entry.getKey(), entry.getValue());
+            }
+        }
+        log.setTags(tags);
+        if (entityDef.isTimeSeries()) {
+            log.setTimestamp(entity.getTimestamp());
+        } else {
+            log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0
+        }
+
+        // For Metric entity, prefix is populated along with entity instead of EntityDefinition
+        if (entity.getPrefix() != null && !entity.getPrefix().isEmpty()) {
+            log.setPrefix(entity.getPrefix());
+        } else {
+            log.setPrefix(entityDef.getPrefix());
+        }
+
+        log.setPartitions(entityDef.getPartitions());
+        EntitySerDeserializer des = new EntitySerDeserializer();
+        log.setQualifierValues(des.writeValue(entity, entityDef));
+
+        final IndexDefinition[] indexDefs = entityDef.getIndexes();
+        if (indexDefs != null) {
+            final List<byte[]> indexRowkeys = new ArrayList<byte[]>();
+            for (int i = 0; i < indexDefs.length; ++i) {
+                final IndexDefinition indexDef = indexDefs[i];
+                final byte[] indexRowkey = indexDef.generateIndexRowkey(entity);
+                indexRowkeys.add(indexRowkey);
+            }
+            log.setIndexRowkeys(indexRowkeys);
+        }
+        return log;
+    }
 }
index c8b9a33..d5c8e2c 100755 (executable)
@@ -28,59 +28,62 @@ import java.util.Date;
 import java.util.List;
 
 public class HBaseLogReader2 extends AbstractHBaseLogReader<InternalLog> {
-       protected ResultScanner rs;
+    protected ResultScanner rs;
 
-       public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers) {
-               super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers);
-       }
+    public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+                           Filter filter, String lastScanKey, byte[][] outputQualifiers) {
+        super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers);
+    }
 
-       /**
-        * This constructor supports partition.
-        *
-        * @param ed               entity definition
-        * @param partitions       partition values, which is sorted in partition definition order. TODO: in future we need to support
-        *                         multiple values for one partition field
-        * @param startTime        start time of the query
-        * @param endTime          end time of the query
-        * @param filter           filter for the hbase scan
-        * @param lastScanKey      the key of last scan
-        * @param outputQualifiers the bytes of output qualifier names
-        * @param prefix           can be populated from outside world specifically for generic metric reader
-        */
-       public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix) {
-               super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix);
-       }
+    /**
+     * This constructor supports partition.
+     *
+     * @param ed entity definition
+     * @param partitions partition values, which is sorted in partition definition order. TODO: in future we
+     *            need to support multiple values for one partition field
+     * @param startTime start time of the query
+     * @param endTime end time of the query
+     * @param filter filter for the hbase scan
+     * @param lastScanKey the key of last scan
+     * @param outputQualifiers the bytes of output qualifier names
+     * @param prefix can be populated from outside world specifically for generic metric reader
+     */
+    public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+                           Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix) {
+        super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix);
+    }
 
-       @Override
-       protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
-               rs = tbl.getScanner(scan);
-       }
+    @Override
+    protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
+        rs = tbl.getScanner(scan);
+    }
 
-       /**
-        * <h2>Close:</h2>
-        * 1. Call super.close(): release current table connection <br></br>
-        * 2. Close Scanner<br></br>
-        *
-        * @throws IOException
-        */
-       @Override
-       public void close() throws IOException {
-               super.close();
-               if(rs != null){
-                       rs.close();
-               }
-       }
+    /**
+     * <h2>Close:</h2> 1. Call super.close(): release current table connection <br>
+     * <br>
+     * 2. Close Scanner<br>
+     * <br>
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        super.close();
+        if (rs != null) {
+            rs.close();
+        }
+    }
 
-       @Override
-       public InternalLog read() throws IOException {
-               if (rs == null)
-                       throw new IllegalArgumentException(
-                                       "ResultScanner must be initialized before reading");
-               InternalLog t = null;
-               Result r = rs.next();
-               if (r != null) {
-                       t = HBaseInternalLogHelper.parse(_ed, r, qualifiers);
-               }
-               return t;
-       }
+    @Override
+    public InternalLog read() throws IOException {
+        if (rs == null) {
+            throw new IllegalArgumentException("ResultScanner must be initialized before reading");
+        }
+        InternalLog t = null;
+        Result r = rs.next();
+        if (r != null) {
+            t = HBaseInternalLogHelper.parse(ed, r, qualifiers);
+        }
+        return t;
+    }
 }
index 059ee7f..1cf23b6 100644 (file)
@@ -29,124 +29,125 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HBaseLogWriter implements LogWriter {
-       private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class);
-       private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes();
-       
-       private HTableInterface tbl;
-       private String table;
-       private String columnFamily;
-       
-       public HBaseLogWriter(String table, String columnFamily) {
-               // TODO assert for non-null of table and columnFamily
-               this.table = table;
-               this.columnFamily = columnFamily;
-       }
-       
-       @Override
-       public void open() throws IOException {
-               try{
-                       tbl = EagleConfigFactory.load().getHTable(this.table);
-//                     LOGGER.info("HBase table " + table + " audo reflush is " + (tbl.isAutoFlush() ? "enabled" : "disabled"));
-               }catch(Exception ex){
-                       LOG.error("Cannot create htable", ex);
-                       throw new IOException(ex);
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-               if(tbl != null){
-                       new HTableFactory().releaseHTableInterface(tbl);
-               }
-       }
-
-       @Override
-       public void flush() throws IOException {
-               tbl.flushCommits();
-       }
-       
-       protected void populateColumnValues(Put p, InternalLog log){
-               Map<String, byte[]> qualifierValues = log.getQualifierValues();
-               // iterate all qualifierValues
-               for(Map.Entry<String, byte[]> entry : qualifierValues.entrySet()){
-                       p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue());
-               }
-               
-               Map<String, String> tags = log.getTags();
-               // iterate all tags, each tag will be stored as a column qualifier
-               if(tags != null){
-                       for(Map.Entry<String, String> entry : tags.entrySet()){
-                               // TODO need a consistent handling of null values
-                               if(entry.getValue() != null)
-                                       p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue().getBytes());
-                       }
-               }
-       }
-
-       /**
-        * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
-        */
-       @Override
-       public byte[] write(InternalLog log) throws IOException{
-               final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
-               final Put p = new Put(rowkey);
-               populateColumnValues(p, log);
-               tbl.put(p);
-               final List<byte[]> indexRowkeys = log.getIndexRowkeys();
-               if (indexRowkeys != null) {
-                       writeIndexes(rowkey, indexRowkeys);
-               }
-               return rowkey;
-       }
-
-       /**
-        * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
-        */
-       public List<byte[]> write(List<InternalLog> logs) throws IOException{
-               final List<Put> puts = new ArrayList<Put>(logs.size());
-               final List<byte[]> result = new ArrayList<byte[]>(logs.size());
-               for (InternalLog log : logs) {
-                       final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
-                       final Put p = new Put(rowkey);
-                       populateColumnValues(p, log);
-                       puts.add(p);
-                       final List<byte[]> indexRowkeys = log.getIndexRowkeys();
-                       if (indexRowkeys != null) {
-                               writeIndexes(rowkey, indexRowkeys, puts);
-                       }
-                       result.add(rowkey);
-               }
-               tbl.put(puts);
-               return result;
-       }
-       
-       @Override
-       public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException{
-               Put p = new Put(rowkey);
-               populateColumnValues(p, log);
-               tbl.put(p);
-               final List<byte[]> indexRowkeys = log.getIndexRowkeys();
-               if (indexRowkeys != null) {
-                       writeIndexes(rowkey, indexRowkeys);
-               }
-       }
-
-       private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws IOException {
-               for (byte[] indexRowkey : indexRowkeys) {
-                       Put p = new Put(indexRowkey);
-                       p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
-                       tbl.put(p);
-               }
-       }
-
-       private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, List<Put> puts) throws IOException {
-               for (byte[] indexRowkey : indexRowkeys) {
-                       Put p = new Put(indexRowkey);
-                       p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
-                       puts.add(p);
-//                     tbl.put(p);
-               }
-       }
-
-       
+    private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class);
+    private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes();
+
+    private HTableInterface tbl;
+    private String table;
+    private String columnFamily;
+
+    public HBaseLogWriter(String table, String columnFamily) {
+        // TODO assert for non-null of table and columnFamily
+        this.table = table;
+        this.columnFamily = columnFamily;
+    }
+
+    @Override
+    public void open() throws IOException {
+        try {
+            tbl = EagleConfigFactory.load().getHTable(this.table);
+            // LOGGER.info("HBase table " + table + " audo reflush is " + (tbl.isAutoFlush() ? "enabled" :
+            // "disabled"));
+        } catch (Exception ex) {
+            LOG.error("Cannot create htable", ex);
+            throw new IOException(ex);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (tbl != null) {
+            new HTableFactory().releaseHTableInterface(tbl);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        tbl.flushCommits();
+    }
+
+    protected void populateColumnValues(Put p, InternalLog log) {
+        Map<String, byte[]> qualifierValues = log.getQualifierValues();
+        // iterate all qualifierValues
+        for (Map.Entry<String, byte[]> entry : qualifierValues.entrySet()) {
+            p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue());
+        }
+
+        Map<String, String> tags = log.getTags();
+        // iterate all tags, each tag will be stored as a column qualifier
+        if (tags != null) {
+            for (Map.Entry<String, String> entry : tags.entrySet()) {
+                // TODO need a consistent handling of null values
+                if (entry.getValue() != null) {
+                    p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue().getBytes());
+                }
+            }
+        }
+    }
+
+    /**
+     * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
+     */
+    @Override
+    public byte[] write(InternalLog log) throws IOException {
+        final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+        final Put p = new Put(rowkey);
+        populateColumnValues(p, log);
+        tbl.put(p);
+        final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+        if (indexRowkeys != null) {
+            writeIndexes(rowkey, indexRowkeys);
+        }
+        return rowkey;
+    }
+
+    /**
+     * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
+     */
+    public List<byte[]> write(List<InternalLog> logs) throws IOException {
+        final List<Put> puts = new ArrayList<Put>(logs.size());
+        final List<byte[]> result = new ArrayList<byte[]>(logs.size());
+        for (InternalLog log : logs) {
+            final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+            final Put p = new Put(rowkey);
+            populateColumnValues(p, log);
+            puts.add(p);
+            final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+            if (indexRowkeys != null) {
+                writeIndexes(rowkey, indexRowkeys, puts);
+            }
+            result.add(rowkey);
+        }
+        tbl.put(puts);
+        return result;
+    }
+
+    @Override
+    public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException {
+        Put p = new Put(rowkey);
+        populateColumnValues(p, log);
+        tbl.put(p);
+        final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+        if (indexRowkeys != null) {
+            writeIndexes(rowkey, indexRowkeys);
+        }
+    }
+
+    private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws IOException {
+        for (byte[] indexRowkey : indexRowkeys) {
+            Put p = new Put(indexRowkey);
+            p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+            tbl.put(p);
+        }
+    }
+
+    private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, List<Put> puts) throws IOException {
+        for (byte[] indexRowkey : indexRowkeys) {
+            Put p = new Put(indexRowkey);
+            p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+            puts.add(p);
+            // tbl.put(p);
+        }
+    }
+
 }
index 8276640..066401f 100755 (executable)
@@ -25,115 +25,134 @@ import java.util.Map;
  * TODO we should decouple BaseLog during write time and BaseLog during read time
  */
 public class InternalLog {
-       private String encodedRowkey;
-       private String prefix;
-       private String[] partitions;
-       private long timestamp;
-       private Map<String, byte[]> qualifierValues;
-
-       private Map<String,Object> extraValues;
-       private Map<String, String> tags;
-       private Map<String, List<String>> searchTags;
-       private List<byte[]> indexRowkeys;
-
-       public String getEncodedRowkey() {
-               return encodedRowkey;
-       }
-
-       public void setEncodedRowkey(String encodedRowkey) {
-               this.encodedRowkey = encodedRowkey;
-       }
-       
-       public Map<String, byte[]> getQualifierValues() {
-               return qualifierValues;
-       }
-       public void setQualifierValues(Map<String, byte[]> qualifierValues) {
-               this.qualifierValues = qualifierValues;
-       }
-
-       public Map<String, List<String>> getSearchTags() {
-               return searchTags;
-       }
-       public void setSearchTags(Map<String, List<String>> searchTags) {
-               this.searchTags = searchTags;
-       }
-       public String getPrefix() {
-               return prefix;
-       }
-       public void setPrefix(String prefix) {
-               this.prefix = prefix;
-       }
-       public String[] getPartitions() {
-               return partitions;
-       }
-       public void setPartitions(String[] partitions) {
-               this.partitions = partitions;
-       }
-       public long getTimestamp() {
-               return timestamp;
-       }
-       public void setTimestamp(long timestamp) {
-               this.timestamp = timestamp;
-       }
-       public Map<String, String> getTags() {
-               return tags;
-       }
-       public void setTags(Map<String, String> tags) {
-               this.tags = tags;
-       }
-       public List<byte[]> getIndexRowkeys() {
-               return indexRowkeys;
-       }
-       public void setIndexRowkeys(List<byte[]> indexRowkeys) {
-               this.indexRowkeys = indexRowkeys;
-       }
-       public Map<String, Object> getExtraValues() { return extraValues; }
-       public void setExtraValues(Map<String, Object> extraValues) { this.extraValues = extraValues; }
-
-       public String toString(){
-               StringBuffer sb = new StringBuffer();
-               sb.append(prefix);
-               sb.append("|");
-               sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
-               sb.append("(");
-               sb.append(timestamp);
-               sb.append(")");
-               sb.append("|searchTags:");
-               if(searchTags != null){
-                       for(String tagkey : searchTags.keySet()){
-                               sb.append(tagkey);
-                               sb.append('=');
-                               List<String> tagValues = searchTags.get(tagkey);
-                               sb.append("(");
-                               for(String tagValue : tagValues){
-                                       sb.append(tagValue);
-                                       sb.append(",");
-                               }
-                               sb.append(")");
-                               sb.append(",");
-                       }
-               }
-               sb.append("|tags:");
-               if(tags != null){
-                       for(Map.Entry<String, String> entry : tags.entrySet()){
-                               sb.append(entry.getKey());
-                               sb.append("=");
-                               sb.append(entry.getValue());
-                               sb.append(",");
-                       }
-               }
-               sb.append("|columns:");
-               if(qualifierValues != null){
-                       for(String qualifier : qualifierValues.keySet()){
-                               byte[] value = qualifierValues.get(qualifier);
-                               sb.append(qualifier);
-                               sb.append("=");
-                               if(value != null){
-                                       sb.append(new String(value));
-                               }
-                               sb.append(",");
-                       }
-               }
-               return sb.toString();
-       }
+    private String encodedRowkey;
+    private String prefix;
+    private String[] partitions;
+    private long timestamp;
+    private Map<String, byte[]> qualifierValues;
+
+    private Map<String, Object> extraValues;
+    private Map<String, String> tags;
+    private Map<String, List<String>> searchTags;
+    private List<byte[]> indexRowkeys;
+
+    public String getEncodedRowkey() {
+        return encodedRowkey;
+    }
+
+    public void setEncodedRowkey(String encodedRowkey) {
+        this.encodedRowkey = encodedRowkey;
+    }
+
+    public Map<String, byte[]> getQualifierValues() {
+        return qualifierValues;
+    }
+
+    public void setQualifierValues(Map<String, byte[]> qualifierValues) {
+        this.qualifierValues = qualifierValues;
+    }
+
+    public Map<String, List<String>> getSearchTags() {
+        return searchTags;
+    }
+
+    public void setSearchTags(Map<String, List<String>> searchTags) {
+        this.searchTags = searchTags;
+    }
+
+    public String getPrefix() {
+        return prefix;
+    }
+
+    public void setPrefix(String prefix) {
+        this.prefix = prefix;
+    }
+
+    public String[] getPartitions() {
+        return partitions;
+    }
+
+    public void setPartitions(String[] partitions) {
+        this.partitions = partitions;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public Map<String, String> getTags() {
+        return tags;
+    }
+
+    public void setTags(Map<String, String> tags) {
+        this.tags = tags;
+    }
+
+    public List<byte[]> getIndexRowkeys() {
+        return indexRowkeys;
+    }
+
+    public void setIndexRowkeys(List<byte[]> indexRowkeys) {
+        this.indexRowkeys = indexRowkeys;
+    }
+
+    public Map<String, Object> getExtraValues() {
+        return extraValues;
+    }
+
+    public void setExtraValues(Map<String, Object> extraValues) {
+        this.extraValues = extraValues;
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append(prefix);
+        sb.append("|");
+        sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
+        sb.append("(");
+        sb.append(timestamp);
+        sb.append(")");
+        sb.append("|searchTags:");
+        if (searchTags != null) {
+            for (String tagkey : searchTags.keySet()) {
+                sb.append(tagkey);
+                sb.append('=');
+                List<String> tagValues = searchTags.get(tagkey);
+                sb.append("(");
+                for (String tagValue : tagValues) {
+                    sb.append(tagValue);
+                    sb.append(",");
+                }
+                sb.append(")");
+                sb.append(",");
+            }
+        }
+        sb.append("|tags:");
+        if (tags != null) {
+            for (Map.Entry<String, String> entry : tags.entrySet()) {
+                sb.append(entry.getKey());
+                sb.append("=");
+                sb.append(entry.getValue());
+                sb.append(",");
+            }
+        }
+        sb.append("|columns:");
+        if (qualifierValues != null) {
+            for (String qualifier : qualifierValues.keySet()) {
+                byte[] value = qualifierValues.get(qualifier);
+                sb.append(qualifier);
+                sb.append("=");
+                if (value != null) {
+                    sb.append(new String(value));
+                }
+                sb.append(",");
+            }
+        }
+        return sb.toString();
+    }
 }
index 3f748d6..93e714b 100755 (executable)
@@ -20,59 +20,77 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 /**
- * TODO: (hchen9) currently we disable firstTimestamp in response avoid breaking older client implementation, but we may need to remove "firstTimestamp" from @JsonIgnoreProperties(ignoreUnknown = true,value={"firstTimestamp"}) to enable the feature later
+ * TODO: (hchen9) currently we disable firstTimestamp in response avoid breaking older client implementation,
+ * but we may need to remove "firstTimestamp" from @JsonIgnoreProperties(ignoreUnknown =
+ * true,value={"firstTimestamp"}) to enable the feature later
  */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true,value={"firstTimestamp"})
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true, value = {
+    "firstTimestamp"
+    })
 public class ListQueryAPIResponseEntity {
-       private boolean success;
-       private String exception;
-       private int totalResults;
-       private long elapsedms;
-       private long lastTimestamp;
-       private long firstTimestamp;
-       public long getFirstTimestamp() {
-               return firstTimestamp;
-       }
-       public void setFirstTimestamp(long firstTimestamp) {
-               this.firstTimestamp = firstTimestamp;
-       }
-       private Object obj;
-
-       public long getElapsedms() {
-               return elapsedms;
-       }
-       public void setElapsedms(long elapsedms) {
-               this.elapsedms = elapsedms;
-       }
-       public boolean isSuccess() {
-               return success;
-       }
-       public void setSuccess(boolean success) {
-               this.success = success;
-       }
-       public String getException() {
-               return exception;
-       }
-       public void setException(String exception) {
-               this.exception = exception;
-       }
-       public int getTotalResults() {
-               return totalResults;
-       }
-       public void setTotalResults(int totalResults) {
-               this.totalResults = totalResults;
-       }
-       public long getLastTimestamp() {
-               return lastTimestamp;
-       }
-       public void setLastTimestamp(long lastTimestamp) {
-               this.lastTimestamp = lastTimestamp;
-       }
-       public Object getObj() {
-               return obj;
-       }
-       public void setObj(Object obj) {
-               this.obj = obj;
-       }
-}
\ No newline at end of file
+    private boolean success;
+    private String exception;
+    private int totalResults;
+    private long elapsedms;
+    private long lastTimestamp;
+    private long firstTimestamp;
+
+    public long getFirstTimestamp() {
+        return firstTimestamp;
+    }
+
+    public void setFirstTimestamp(long firstTimestamp) {
+        this.firstTimestamp = firstTimestamp;
+    }
+
+    private Object obj;
+
+    public long getElapsedms() {
+        return elapsedms;
+    }
+
+    public void setElapsedms(long elapsedms) {
+        this.elapsedms = elapsedms;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
+
+    public int getTotalResults() {
+        return totalResults;
+    }
+
+    public void setTotalResults(int totalResults) {
+        this.totalResults = totalResults;
+    }
+
+    public long getLastTimestamp() {
+        return lastTimestamp;
+    }
+
+    public void setLastTimestamp(long lastTimestamp) {
+        this.lastTimestamp = lastTimestamp;
+    }
+
+    public Object getObj() {
+        return obj;
+    }
+
+    public void setObj(Object obj) {
+        this.obj = obj;
+    }
+}
index da1e1ab..a0dd29a 100755 (executable)
@@ -19,10 +19,11 @@ package org.apache.eagle.log.entity;
 import java.io.Closeable;
 import java.io.IOException;
 
-public interface LogReader<T> extends Closeable{
-       public void open() throws IOException;
+public interface LogReader<T> extends Closeable {
+    public void open() throws IOException;
 
-       public void close() throws IOException;
-       
-       public T read() throws IOException;
+    @Override
+    public void close() throws IOException;
+
+    public T read() throws IOException;
 }
index 6ef4ee3..9c10cd4 100644 (file)
@@ -19,14 +19,15 @@ package org.apache.eagle.log.entity;
 import java.io.Closeable;
 import java.io.IOException;
 
-public interface LogWriter extends Closeable{
-       public void flush() throws IOException;
+public interface LogWriter extends Closeable {
+    public void flush() throws IOException;
 
-       public void open() throws IOException;
+    public void open() throws IOException;
 
-       public void close() throws IOException;
+    @Override
+    public void close() throws IOException;
 
-       public byte[] write(InternalLog log) throws IOException;
-       
-       public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException;
-}
\ No newline at end of file
+    public byte[] write(InternalLog log) throws IOException;
+
+    public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException;
+}
index a430393..4bf82e6 100755 (executable)
@@ -28,90 +28,106 @@ import org.apache.eagle.log.entity.meta.Prefix;
 import org.apache.eagle.log.entity.meta.Table;
 import org.apache.eagle.log.entity.meta.TimeSeries;
 
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("eagle_metric")
 @ColumnFamily("f")
 @Prefix("dmeta")
 @Service("MetricMetadataService")
 @TimeSeries(false)
 @Indexes({
-       @Index(name="Index_1_name", columns = { "name" }, unique = true)
-       })
+          @Index(name = "Index_1_name", columns = {
+                                                   "name"
+          }, unique = true)
+    })
 public class MetricMetadataEntity extends TaggedLogAPIEntity {
-       
-       @Column("a")
-       private String storeType;
-       @Column("b")
-       private String displayName;
-       @Column("c")
-       private String defaultDownSamplingFunction;
-       @Column("d")
-       private String defaultAggregateFunction;
-       @Column("e")
-       private String aggFunctions;
-       @Column("f")
-       private String downSamplingFunctions;
-       @Column("g")
-       private String resolutions;
-       @Column("h")
-       private String drillDownPaths;
-       
-       public String getStoreType() {
-               return storeType;
-       }
-       public void setStoreType(String storeType) {
-               this.storeType = storeType;
-               pcs.firePropertyChange("storeType", null, null);
-       }
-       public String getDisplayName() {
-               return displayName;
-       }
-       public void setDisplayName(String displayName) {
-               this.displayName = displayName;
-               pcs.firePropertyChange("displayName", null, null);
-       }
-       public String getDefaultDownSamplingFunction() {
-               return defaultDownSamplingFunction;
-       }
-       public void setDefaultDownSamplingFunction(String defaultDownSamplingFunction) {
-               this.defaultDownSamplingFunction = defaultDownSamplingFunction;
-               pcs.firePropertyChange("defaultDownSamplingFunction", null, null);
-       }
-       public String getDefaultAggregateFunction() {
-               return defaultAggregateFunction;
-       }
-       public void setDefaultAggregateFunction(String defaultAggregateFunction) {
-               this.defaultAggregateFunction = defaultAggregateFunction;
-               pcs.firePropertyChange("defaultAggregateFunction", null, null);
-       }
-       public String getAggFunctions() {
-               return aggFunctions;
-       }
-       public void setAggFunctions(String aggFunctions) {
-               this.aggFunctions = aggFunctions;
-               pcs.firePropertyChange("aggFunctions", null, null);
-       }
-       public String getDownSamplingFunctions() {
-               return downSamplingFunctions;
-       }
-       public void setDownSamplingFunctions(String downSamplingFunctions) {
-               this.downSamplingFunctions = downSamplingFunctions;
-               pcs.firePropertyChange("downSamplingFunctions", null, null);
-       }
-       public String getResolutions() {
-               return resolutions;
-       }
-       public void setResolutions(String resolutions) {
-               this.resolutions = resolutions;
-               pcs.firePropertyChange("resolutions", null, null);
-       }
-       public String getDrillDownPaths() {
-               return drillDownPaths;
-       }
-       public void setDrillDownPaths(String drillDownPaths) {
-               this.drillDownPaths = drillDownPaths;
-               pcs.firePropertyChange("drillDownPaths", null, null);
-       }
-       
+
+    @Column("a")
+    private String storeType;
+    @Column("b")
+    private String displayName;
+    @Column("c")
+    private String defaultDownSamplingFunction;
+    @Column("d")
+    private String defaultAggregateFunction;
+    @Column("e")
+    private String aggFunctions;
+    @Column("f")
+    private String downSamplingFunctions;
+    @Column("g")
+    private String resolutions;
+    @Column("h")
+    private String drillDownPaths;
+
+    public String getStoreType() {
+        return storeType;
+    }
+
+    public void setStoreType(String storeType) {
+        this.storeType = storeType;
+        pcs.firePropertyChange("storeType", null, null);
+    }
+
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    public void setDisplayName(String displayName) {
+        this.displayName = displayName;
+        pcs.firePropertyChange("displayName", null, null);
+    }
+
+    public String getDefaultDownSamplingFunction() {
+        return defaultDownSamplingFunction;
+    }
+
+    public void setDefaultDownSamplingFunction(String defaultDownSamplingFunction) {
+        this.defaultDownSamplingFunction = defaultDownSamplingFunction;
+        pcs.firePropertyChange("defaultDownSamplingFunction", null, null);
+    }
+
+    public String getDefaultAggregateFunction() {
+        return defaultAggregateFunction;
+    }
+
+    public void setDefaultAggregateFunction(String defaultAggregateFunction) {
+        this.defaultAggregateFunction = defaultAggregateFunction;
+        pcs.firePropertyChange("defaultAggregateFunction", null, null);
+    }
+
+    public String getAggFunctions() {
+        return aggFunctions;
+    }
+
+    public void setAggFunctions(String aggFunctions) {
+        this.aggFunctions = aggFunctions;
+        pcs.firePropertyChange("aggFunctions", null, null);
+    }
+
+    public String getDownSamplingFunctions() {
+        return downSamplingFunctions;
+    }
+
+    public void setDownSamplingFunctions(String downSamplingFunctions) {
+        this.downSamplingFunctions = downSamplingFunctions;
+        pcs.firePropertyChange("downSamplingFunctions", null, null);
+    }
+
+    public String getResolutions() {
+        return resolutions;
+    }
+
+    public void setResolutions(String resolutions) {
+        this.resolutions = resolutions;
+        pcs.firePropertyChange("resolutions", null, null);
+    }
+
+    public String getDrillDownPaths() {
+        return drillDownPaths;
+    }
+
+    public void setDrillDownPaths(String drillDownPaths) {
+        this.drillDownPaths = drillDownPaths;
+        pcs.firePropertyChange("drillDownPaths", null, null);
+    }
+
 }
index b0eeaed..890540d 100755 (executable)
@@ -19,10 +19,10 @@ package org.apache.eagle.log.entity;
 import java.util.Map;
 
 public interface QualifierCreationListener {
-       /**
-        * Qualifier <b>display name</b> mapped to qualifier value in bytes[]
-        *
-        * @param qualifiers
-        */
-       public void qualifierCreated(Map<String, byte[]> qualifiers);
+    /**
+     * Qualifier <b>display name</b> mapped to qualifier value in bytes[]
+     *
+     * @param qualifiers
+     */
+    public void qualifierCreated(Map<String, byte[]> qualifiers);
 }
index 88135bb..1225ba7 100644 (file)
  */
 package org.apache.eagle.log.entity;
 
-public class QualifierNotDefinedException extends Exception{
-       /**
-        * 
-        */
-       private static final long serialVersionUID = 1L;
+public class QualifierNotDefinedException extends Exception {
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
 
-       public QualifierNotDefinedException(String message){
-               super(message);
-       }
+    public QualifierNotDefinedException(String message) {
+        super(message);
+    }
 }
index 5154cc4..2ef0680 100755 (executable)
@@ -27,114 +27,122 @@ import org.apache.eagle.log.entity.meta.EntityDefinition;
 import org.apache.eagle.common.ByteUtil;
 
 public class RowkeyBuilder {
-       
-       public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0;
-       
-       /**
-        * Generate the internal sorted hashmap for tags. Please note the partition tags should not be included in the result map.
-        * @param partitions array of partition tags in order
-        * @param tags tags of the entity
-        * @return the sorted hash map of the tags
-        */
-       public static SortedMap<Integer, Integer> generateSortedTagMap(String[] partitions, Map<String, String> tags) {
-               final SortedMap<Integer, Integer> tagHashMap = new TreeMap<Integer, Integer>();
-               for (Map.Entry<String, String> entry: tags.entrySet()) {
-                       final String tagName = entry.getKey();
-                       final String tagValue = entry.getValue();
-                       // If it's a partition tag, we need to remove it from tag hash list. It need to 
-                       // put to the fix partition hash slot in rowkey.
-                       if (tagValue == null || isPartitionTag(partitions, tagName))
-                               continue;
-                       tagHashMap.put(tagName.hashCode(), tagValue.hashCode());
-               }
-               return tagHashMap;
-       }
-       
-       /**
-        * build rowkey from InternalLog object
-        * @param log internal log entity to write
-        * @return the rowkey of the entity
-        */
-       public static byte[] buildRowkey(InternalLog log) {
-               final String[] partitions = log.getPartitions();
-               final Map<String, String> tags = log.getTags();
-               final SortedMap<Integer, Integer> tagHashMap = generateSortedTagMap(partitions, tags);
-               
-               // reverse timestamp
-               long ts = Long.MAX_VALUE - log.getTimestamp();
-               
-               List<Integer> partitionHashValues = new ArrayList<Integer>();
-               if (partitions != null) {
-                       for (String partition : partitions) {
-                               final String tagValue = tags.get(partition);
-                               if (tagValue != null) {
-                                       partitionHashValues.add(tagValue.hashCode());
-                               } else {
-                                       partitionHashValues.add(EMPTY_PARTITION_DEFAULT_HASH_CODE);
-                               }
-                       }
-               }
-               return buildRowkey(log.getPrefix().hashCode(), partitionHashValues, ts, tagHashMap);
-       }
-       
-       public static long getTimestamp(byte[] rowkey, EntityDefinition ed) {
-               if (!ed.isTimeSeries()) {
-                       return EntityConstants.FIXED_WRITE_TIMESTAMP;
-               }
-               final int offset = (ed.getPartitions() == null) ? 4 : (4 + ed.getPartitions().length * 4);
-               return Long.MAX_VALUE - ByteUtil.bytesToLong(rowkey, offset);
-       }
-       
-       /**
-        * Check if the tagName is one of the partition tags
-        * @param partitions paritition tags of the entity
-        * @param tagName the tag name that needs to check
-        * @return
-        */
-       private static boolean isPartitionTag(String[] partitions, String tagName) {
-               if (partitions != null) {
-                       for (String partition : partitions) {
-                               if (partition.equals(tagName)) {
-                                       return true;
-                               }
-                       }
-               }
-               return false;
-       }
-
-       /**
-        * rowkey is: prefixHash:4 + (partitionValueHash:4)* + timestamp:8 + (tagnameHash:4 + tagvalueHash:4)*
-        * partition fields are sorted by partition definition order, while tag fields are sorted by tag name's 
-        * hash code values. 
-        */
-       private static byte[] buildRowkey(int prefixHash, List<Integer> partitionHashValues, long timestamp, SortedMap<Integer, Integer> tags){
-               // allocate byte array for rowkey
-               final int len = 4 + 8 + tags.size() * (4 + 4) + (partitionHashValues.size() * 4);
-               final byte[] rowkey = new byte[len];
-               int offset = 0;
-
-               // 1. set prefix
-               ByteUtil.intToBytes(prefixHash, rowkey, offset);
-               offset += 4;
-               
-               // 2. set partition
-               for (Integer partHash : partitionHashValues) {
-                       ByteUtil.intToBytes(partHash, rowkey, offset);
-                       offset += 4;
-               }
-               
-               // 3. set timestamp
-               ByteUtil.longToBytes(timestamp, rowkey, offset);
-               offset += 8;
-
-               // 4. set tag key/value hashes
-               for (Map.Entry<Integer, Integer> entry : tags.entrySet()) {
-                       ByteUtil.intToBytes(entry.getKey(), rowkey, offset);
-                       offset += 4;
-                       ByteUtil.intToBytes(entry.getValue(), rowkey, offset);
-                       offset += 4;
-               }
-               
-               return rowkey;
-       }
+
+    public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0;
+
+    /**
+     * Generate the internal sorted hashmap for tags. Please note the partition tags should not be included in
+     * the result map.
+     *
+     * @param partitions array of partition tags in order
+     * @param tags tags of the entity
+     * @return the sorted hash map of the tags
+     */
+    public static SortedMap<Integer, Integer> generateSortedTagMap(String[] partitions,
+                                                                   Map<String, String> tags) {
+        final SortedMap<Integer, Integer> tagHashMap = new TreeMap<Integer, Integer>();
+        for (Map.Entry<String, String> entry : tags.entrySet()) {
+            final String tagName = entry.getKey();
+            final String tagValue = entry.getValue();
+            // If it's a partition tag, we need to remove it from tag hash list. It need to
+            // put to the fix partition hash slot in rowkey.
+            if (tagValue == null || isPartitionTag(partitions, tagName)) {
+                continue;
+            }
+            tagHashMap.put(tagName.hashCode(), tagValue.hashCode());
+        }
+        return tagHashMap;
+    }
+
+    /**
+     * build rowkey from InternalLog object
+     *
+     * @param log internal log entity to write
+     * @return the rowkey of the entity
+     */
+    public static byte[] buildRowkey(InternalLog log) {
+        final String[] partitions = log.getPartitions();
+        final Map<String, String> tags = log.getTags();
+        final SortedMap<Integer, Integer> tagHashMap = generateSortedTagMap(partitions, tags);
+
+        // reverse timestamp
+        long ts = Long.MAX_VALUE - log.getTimestamp();
+
+        List<Integer> partitionHashValues = new ArrayList<Integer>();
+        if (partitions != null) {
+            for (String partition : partitions) {
+                final String tagValue = tags.get(partition);
+                if (tagValue != null) {
+                    partitionHashValues.add(tagValue.hashCode());
+                } else {
+                    partitionHashValues.add(EMPTY_PARTITION_DEFAULT_HASH_CODE);
+                }
+            }
+        }
+        return buildRowkey(log.getPrefix().hashCode(), partitionHashValues, ts, tagHashMap);
+    }
+
+    /**
+     * rowkey is: prefixHash:4 + (partitionValueHash:4)* + timestamp:8 + (tagnameHash:4 + tagvalueHash:4)*
+     * partition fields are sorted by partition definition order, while tag fields are sorted by tag name's
+     * hash code values.
+     */
+    private static byte[] buildRowkey(int prefixHash, List<Integer> partitionHashValues, long timestamp,
+                                      SortedMap<Integer, Integer> tags) {
+        // allocate byte array for rowkey
+        final int len = 4 + 8 + tags.size() * (4 + 4) + (partitionHashValues.size() * 4);
+        final byte[] rowkey = new byte[len];
+        int offset = 0;
+
+        // 1. set prefix
+        ByteUtil.intToBytes(prefixHash, rowkey, offset);
+        offset += 4;
+
+        // 2. set partition
+        for (Integer partHash : partitionHashValues) {
+            ByteUtil.intToBytes(partHash, rowkey, offset);
+            offset += 4;
+        }
+
+        // 3. set timestamp
+        ByteUtil.longToBytes(timestamp, rowkey, offset);
+        offset += 8;
+
+        // 4. set tag key/value hashes
+        for (Map.Entry<Integer, Integer> entry : tags.entrySet()) {
+            ByteUtil.intToBytes(entry.getKey(), rowkey, offset);
+            offset += 4;
+            ByteUtil.intToBytes(entry.getValue(), rowkey, offset);
+            offset += 4;
+        }
+
+        return rowkey;
+    }
+
+    public static long getTimestamp(byte[] rowkey, EntityDefinition ed) {
+        if (!ed.isTimeSeries()) {
+            return EntityConstants.FIXED_WRITE_TIMESTAMP;
+        }
+        final int offset = (ed.getPartitions() == null) ? 4 : (4 + ed.getPartitions().length * 4);
+        return Long.MAX_VALUE - ByteUtil.bytesToLong(rowkey, offset);
+    }
+
+    /**
+     * Check if the tagName is one of the partition tags
+     *
+     * @param partitions paritition tags of the entity
+     * @param tagName the tag name that needs to check
+     * @return
+     */
+    private static boolean isPartitionTag(String[] partitions, String tagName) {
+        if (partitions != null) {
+            for (String partition : partitions) {
+                if (partition.equals(tagName)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
 }
index 953d12b..6189983 100644 (file)
@@ -18,28 +18,33 @@ package org.apache.eagle.log.entity;
 
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 public class RowkeyQueryAPIResponseEntity {
-       private boolean success;
-       private String exception;
-       private Object obj;
-
-       public boolean isSuccess() {
-               return success;
-       }
-       public void setSuccess(boolean success) {
-               this.success = success;
-       }
-       public String getException() {
-               return exception;
-       }
-       public void setException(String exception) {
-               this.exception = exception;
-       }
-       public Object getObj() {
-               return obj;
-       }
-       public void setObj(Object obj) {
-               this.obj = obj;
-       }
-}
\ No newline at end of file
+    private boolean success;
+    private String exception;
+    private Object obj;
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
+
+    public Object getObj() {
+        return obj;
+    }
+
+    public void setObj(Object obj) {
+        this.obj = obj;
+    }
+}
index 68db2c4..e9c0dca 100755 (executable)
@@ -23,126 +23,138 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * search condition includes the following:
- * 1. prefix - part of rowkey
- * 2. startTime,endTime - timestamp, part of rowkey
- * 3. hbase filter converted from query 
- * 4. aggregate parameters
- * 4. sort options
- * 5. output fields and tags
- * 6. entityName
- * 7. pagination: pageSize and startRowkey
+ * search condition includes the following: 1. prefix - part of rowkey 2. startTime,endTime - timestamp, part
+ * of rowkey 3. hbase filter converted from query 4. aggregate parameters 4. sort options 5. output fields and
+ * tags 6. entityName 7. pagination: pageSize and startRowkey
  */
-public class SearchCondition{
-       private long startTime;
-       private long endTime;
-       private Filter filter;
-       private List<String> outputFields;
-       private boolean outputAll;
-       private long pageSize;
-       private String startRowkey;
-       private String entityName;
-       private List<String> partitionValues;
-       private ORExpression queryExpression;
-
-       public boolean isOutputVerbose() {
-               return outputVerbose;
-       }
-
-       public void setOutputVerbose(boolean outputVerbose) {
-               this.outputVerbose = outputVerbose;
-       }
-
-       public Map<String, String> getOutputAlias() {
-               return outputAlias;
-       }
-
-       public void setOutputAlias(Map<String, String> outputAlias) {
-               this.outputAlias = outputAlias;
-       }
-
-       private boolean outputVerbose;
-       private Map<String,String> outputAlias;
-
-       /**
-        * copy constructor
-        * @param sc
-        */
-       public SearchCondition(SearchCondition sc){
-               this.startTime = sc.startTime;
-               this.endTime = sc.endTime;
-               this.filter = sc.filter;
-               this.outputFields = sc.outputFields;
-               this.pageSize = sc.pageSize;
-               this.startRowkey = sc.startRowkey;
-               this.entityName = sc.entityName;
-               this.partitionValues = sc.partitionValues;
-               this.queryExpression = sc.queryExpression;
-       }
-       
-       public SearchCondition(){
-       }
-       
-       public Filter getFilter() {
-               return filter;
-       }
-       public void setFilter(Filter filter) {
-               this.filter = filter;
-       }
-       public long getPageSize() {
-               return pageSize;
-       }
-       public void setPageSize(long pageSize) {
-               this.pageSize = pageSize;
-       }
-       public String getStartRowkey() {
-               return startRowkey;
-       }
-       public void setStartRowkey(String startRowkey) {
-               this.startRowkey = startRowkey;
-       }
-       public String getEntityName() {
-               return entityName;
-       }
-       public void setEntityName(String entityName) {
-               this.entityName = entityName;
-       }
-       public List<String> getOutputFields() {
-               return outputFields;
-       }
-       public void setOutputFields(List<String> outputFields) {
-               this.outputFields = outputFields;
-       }
-       public long getStartTime() {
-               return startTime;
-       }
-       public void setStartTime(long startTime) {
-               this.startTime = startTime;
-       }
-       public long getEndTime() {
-               return endTime;
-       }
-       public void setEndTime(long endTime) {
-               this.endTime = endTime;
-       }
-       public List<String> getPartitionValues() {
-               return partitionValues;
-       }
-       public void setPartitionValues(List<String> partitionValues) {
-               this.partitionValues = partitionValues;
-       }
-       public ORExpression getQueryExpression() {
-               return queryExpression;
-       }
-       public void setQueryExpression(ORExpression queryExpression) {
-               this.queryExpression = queryExpression;
-       }
-
-       public boolean isOutputAll() {
-               return outputAll;
-       }
-
-       public void setOutputAll(boolean outputAll) {
-               this.outputAll = outputAll;
-       }
+public class SearchCondition {
+    private long startTime;
+    private long endTime;
+    private Filter filter;
+    private List<String> outputFields;
+    private boolean outputAll;
+    private long pageSize;
+    private String startRowkey;
+    private String entityName;
+    private List<String> partitionValues;
+    private ORExpression queryExpression;
+
+    public boolean isOutputVerbose() {
+        return outputVerbose;
+    }
+
+    public void setOutputVerbose(boolean outputVerbose) {
+        this.outputVerbose = outputVerbose;
+    }
+
+    public Map<String, String> getOutputAlias() {
+        return outputAlias;
+    }
+
+    public void setOutputAlias(Map<String, String> outputAlias) {
+        this.outputAlias = outputAlias;
+    }
+
+    private boolean outputVerbose;
+    private Map<String, String> outputAlias;
+
+    /**
+     * copy constructor
+     * 
+     * @param sc
+     */
+    public SearchCondition(SearchCondition sc) {
+        this.startTime = sc.startTime;
+        this.endTime = sc.endTime;
+        this.filter = sc.filter;
+        this.outputFields = sc.outputFields;
+        this.pageSize = sc.pageSize;
+        this.startRowkey = sc.startRowkey;
+        this.entityName = sc.entityName;
+        this.partitionValues = sc.partitionValues;
+        this.queryExpression = sc.queryExpression;
+    }
+
+    public SearchCondition() {
+    }
+
+    public Filter getFilter() {
+        return filter;
+    }
+
+    public void setFilter(Filter filter) {
+        this.filter = filter;
+    }
+
+    public long getPageSize() {
+        return pageSize;
+    }
+
+    public void setPageSize(long pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    public String getStartRowkey() {
+        return startRowkey;
+    }
+
+    public void setStartRowkey(String startRowkey) {
+        this.startRowkey = startRowkey;
+    }
+
+    public String getEntityName() {
+        return entityName;
+    }
+
+    public void setEntityName(String entityName) {
+        this.entityName = entityName;
+    }
+
+    public List<String> getOutputFields() {
+        return outputFields;
+    }
+
+    public void setOutputFields(List<String> outputFields) {
+        this.outputFields = outputFields;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+    }
+
+    public List<String> getPartitionValues() {
+        return partitionValues;
+    }
+
+    public void setPartitionValues(List<String> partitionValues) {
+        this.partitionValues = partitionValues;
+    }
+
+    public ORExpression getQueryExpression() {
+        return queryExpression;
+    }
+
+    public void setQueryExpression(ORExpression queryExpression) {
+        this.queryExpression = queryExpression;
+    }
+
+    public boolean isOutputAll() {
+        return outputAll;
+    }
+
+    public void setOutputAll(boolean outputAll) {
+        this.outputAll = outputAll;
+    }
 }
index 005a2e2..36aa0b6 100755 (executable)
@@ -20,36 +20,39 @@ import java.util.ArrayList;
 import java.util.List;
 
 public abstract class StreamReader {
-       protected List<EntityCreationListener> _listeners = new ArrayList<EntityCreationListener>();
+    protected List<EntityCreationListener> listeners = new ArrayList<EntityCreationListener>();
 
-       /**
-        * Listener can be only notified after it is added to listener list
-        * @param listener
-        */
-       public synchronized void register(EntityCreationListener listener){
-               _listeners.add(listener);
-       }
-       
-       /**
-        * Listener can not get notification once after it is removed from listener list
-        * @param listener
-        */
-       public synchronized void unregister(EntityCreationListener listener){
-               _listeners.remove(listener);
-       }
-       
-       public abstract void readAsStream() throws Exception;
-       
-       /**
-        * Get scanned last entity timestamp
-        * 
-        * @return
-        */
-       public abstract long getLastTimestamp();
-       
-       /**
-        * Get scanned first entity timestamp
-        * @return
-        */
-       public abstract long getFirstTimestamp();
-}
\ No newline at end of file
+    /**
+     * Listener can be only notified after it is added to listener list
+     *
+     * @param listener
+     */
+    public synchronized void register(EntityCreationListener listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Listener can not get notification once after it is removed from listener list
+     *
+     * @param listener
+     */
+    public synchronized void unregister(EntityCreationListener listener) {
+        listeners.remove(listener);
+    }
+
+    public abstract void readAsStream() throws Exception;
+
+    /**
+     * Get scanned last entity timestamp
+     *
+     * @return
+     */
+    public abstract long getLastTimestamp();
+
+    /**
+     * Get scanned first entity timestamp
+     *
+     * @return
+     */
+    public abstract long getFirstTimestamp();
+}
index 0d71e10..6ecf93c 100755 (executable)
@@ -37,15 +37,11 @@ import java.io.IOException;
 import java.util.*;\r
 \r
 /**\r
- * BooleanExpressionComparator\r
- *\r
- * Currently support double expression only.\r
- *\r
- * TODO: 1) thread-safe? 2) Rewrite filter expression to evaluate once\r
- *\r
+ * BooleanExpressionComparator Currently support double expression only. TODO: 1) thread-safe? 2) Rewrite\r
+ * filter expression to evaluate once\r
  */\r
 public class BooleanExpressionComparator implements WritableComparable<List<KeyValue>> {\r
-    private final static Logger LOG = LoggerFactory.getLogger(BooleanExpressionComparator.class);\r
+    private static final Logger LOG = LoggerFactory.getLogger(BooleanExpressionComparator.class);\r
 \r
     // Should be Writable\r
     private QualifierFilterEntity filterEntity;\r
@@ -62,22 +58,23 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
 \r
     private Set<String> requiredFields = new HashSet<String>();\r
 \r
-    public BooleanExpressionComparator(){}\r
+    public BooleanExpressionComparator() {\r
+    }\r
 \r
-    public BooleanExpressionComparator(QualifierFilterEntity entity,EntityDefinition ed){\r
-        this.filterEntity  = entity;\r
+    public BooleanExpressionComparator(QualifierFilterEntity entity, EntityDefinition ed) {\r
+        this.filterEntity = entity;\r
         this.ed = ed;\r
         try {\r
             this.init();\r
         } catch (Exception ex) {\r
             // Client side expression validation to fast fail if having error\r
-            LOG.error("Got exception: "+ex.getMessage(),ex);\r
-            throw new ExpressionEvaluationException(ex.getMessage(),ex);\r
+            LOG.error("Got exception: " + ex.getMessage(), ex);\r
+            throw new ExpressionEvaluationException(ex.getMessage(), ex);\r
         }\r
     }\r
 \r
     private void init() throws ParsiiInvalidException, ParseException {\r
-        LOG.info("Filter expression: "+filterEntity.toString());\r
+        LOG.info("Filter expression: " + filterEntity.toString());\r
         if (filterEntity.getKey() != null) {\r
             if (filterEntity.getKeyType() == TokenType.NUMBER) {\r
                 leftValue = Double.parseDouble(filterEntity.getKey());\r
@@ -100,11 +97,14 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
             throw new IllegalStateException("QualifierFilterEntity value is null");\r
         }\r
 \r
-        if (this.filterEntity.getOp() == null)\r
+        if (this.filterEntity.getOp() == null) {\r
             throw new IllegalStateException("QualifierFilterEntity op is null");\r
+        }\r
         this.func = _opExprFuncMap.get(this.filterEntity.getOp());\r
-        if (this.func == null)\r
-            throw new IllegalStateException("No boolean evaluation function found for operation: " + this.filterEntity.getOp());\r
+        if (this.func == null) {\r
+            throw new IllegalStateException("No boolean evaluation function found for operation: "\r
+                                            + this.filterEntity.getOp());\r
+        }\r
     }\r
 \r
     /**\r
@@ -114,24 +114,26 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
      * @return evaluation result as true (1) or false (0)\r
      * @throws Exception\r
      */\r
-    private boolean eval(Map<String,Double> context) throws Exception {\r
-        if(filterEntity.getKeyType() != TokenType.NUMBER){\r
-            leftValue = eval(filterEntity.getKey(),context);\r
+    private boolean eval(Map<String, Double> context) throws Exception {\r
+        if (filterEntity.getKeyType() != TokenType.NUMBER) {\r
+            leftValue = eval(filterEntity.getKey(), context);\r
         }\r
-        if(filterEntity.getValueType() != TokenType.NUMBER){\r
-            rightValue = eval(filterEntity.getValue(),context);\r
+        if (filterEntity.getValueType() != TokenType.NUMBER) {\r
+            rightValue = eval(filterEntity.getValue(), context);\r
         }\r
-        if(Double.isInfinite(leftValue) || Double.isInfinite(rightValue)){\r
-//            if(LOG.isDebugEnabled()) {\r
+        if (Double.isInfinite(leftValue) || Double.isInfinite(rightValue)) {\r
+            // if(LOG.isDebugEnabled()) {\r
             if (Double.isInfinite(leftValue)) {\r
-                LOG.warn("Evaluation result of key: " + this.filterEntity.getKey() + " is " + leftValue + " (Infinite), ignore");\r
+                LOG.warn("Evaluation result of key: " + this.filterEntity.getKey() + " is " + leftValue\r
+                         + " (Infinite), ignore");\r
             } else {\r
-                LOG.warn("Evaluation result of value: "+this.filterEntity.getValue()+" is "+rightValue+" (Infinite), ignore");\r
+                LOG.warn("Evaluation result of value: " + this.filterEntity.getValue() + " is " + rightValue\r
+                         + " (Infinite), ignore");\r
             }\r
-//            }\r
+            // }\r
             return false;\r
         }\r
-        return func.eval(leftValue,rightValue);\r
+        return func.eval(leftValue, rightValue);\r
     }\r
 \r
     /**\r
@@ -142,38 +144,40 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
      * @return\r
      * @throws Exception\r
      */\r
-    private double eval(String expr,Map<String,Double> context) throws Exception {\r
+    private double eval(String expr, Map<String, Double> context) throws Exception {\r
         return ExpressionParser.parse(expr).eval(context);\r
     }\r
 \r
     /**\r
-     *\r
      * @param row List[KeyValue] All key values in a row\r
-     *\r
      * @return 0 to filter out row [false], otherwise to include row into scanner [true]\r
      */\r
     @Override\r
     public int compareTo(List<KeyValue> row) {\r
-        Map<String,Double> context = new HashMap<String, Double>();\r
-        for(KeyValue kv:row){\r
+        Map<String, Double> context = new HashMap<String, Double>();\r
+        for (KeyValue kv : row) {\r
             String qualifierName = new String(kv.getQualifier());\r
 \r
             // Because assume just handle about double value\r
             // so ignore tag whose value is String\r
-            if(!this.ed.isTag(qualifierName)){\r
+            if (!this.ed.isTag(qualifierName)) {\r
                 Qualifier qualifier = this.ed.getQualifierNameMap().get(qualifierName);\r
                 String displayName = qualifier.getDisplayName();\r
-                if(displayName == null) displayName = qualifierName;\r
+                if (displayName == null) {\r
+                    displayName = qualifierName;\r
+                }\r
                 try {\r
-                    if(this.requiredFields.contains(displayName)) {\r
+                    if (this.requiredFields.contains(displayName)) {\r
                         EntitySerDeser serDeser = qualifier.getSerDeser();\r
-                        double value = EntityQualifierUtils.convertObjToDouble(serDeser.deserialize(kv.getValue()));\r
+                        double value = EntityQualifierUtils\r
+                            .convertObjToDouble(serDeser.deserialize(kv.getValue()));\r
                         if (Double.isNaN(value)) {\r
                             context.put(displayName, value);\r
                         }\r
                     }\r
-                }catch (Exception ex){\r
-                    LOG.warn("Failed to parse value of field "+displayName+" as double, ignore: "+ex.getMessage(),ex);\r
+                } catch (Exception ex) {\r
+                    LOG.warn("Failed to parse value of field " + displayName + " as double, ignore: "\r
+                             + ex.getMessage(), ex);\r
                 }\r
             }\r
         }\r
@@ -182,22 +186,22 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
 \r
     /**\r
      * @param context Map[String,Double]\r
-     *\r
      * @return context.keySet().containsAll(this.requiredFields) && eval(context) ? 1:0;\r
      */\r
-    int compareTo(Map<String,Double> context){\r
+    int compareTo(Map<String, Double> context) {\r
         try {\r
-            if(context.keySet().containsAll(this.requiredFields)){\r
-                return eval(context)? 1:0;\r
-            }else{\r
-                if(LOG.isDebugEnabled()) {\r
-                    LOG.debug("Require variables: [" + StringUtils.join(this.requiredFields, ",") + "], but just given: [" + StringUtils.join(context.keySet(), ",") + "]");\r
+            if (context.keySet().containsAll(this.requiredFields)) {\r
+                return eval(context) ? 1 : 0;\r
+            } else {\r
+                if (LOG.isDebugEnabled()) {\r
+                    LOG.debug("Require variables: [" + StringUtils.join(this.requiredFields, ",")\r
+                              + "], but just given: [" + StringUtils.join(context.keySet(), ",") + "]");\r
                 }\r
                 return 0;\r
             }\r
         } catch (Exception e) {\r
-            LOG.error(e.getMessage(),e);\r
-            throw new ExpressionEvaluationException(e.getMessage(),e);\r
+            LOG.error(e.getMessage(), e);\r
+            throw new ExpressionEvaluationException(e.getMessage(), e);\r
         }\r
     }\r
 \r
@@ -216,31 +220,31 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
 \r
         try {\r
             this.init();\r
-        } catch (Exception ex){\r
-            LOG.error("Got exception: "+ex.getMessage(),ex);\r
-            throw new IOException(ex.getMessage(),ex);\r
+        } catch (Exception ex) {\r
+            LOG.error("Got exception: " + ex.getMessage(), ex);\r
+            throw new IOException(ex.getMessage(), ex);\r
         }\r
     }\r
 \r
-    private static Map<ComparisonOperator,BooleanExprFunc> _opExprFuncMap = new HashMap<ComparisonOperator, BooleanExprFunc>();\r
+    private static Map<ComparisonOperator, BooleanExprFunc> _opExprFuncMap = new HashMap<ComparisonOperator, BooleanExprFunc>();\r
 \r
     static {\r
-        _opExprFuncMap.put(ComparisonOperator.EQUAL,new EqualExprFunc());\r
-        _opExprFuncMap.put(ComparisonOperator.IS,new EqualExprFunc());\r
+        _opExprFuncMap.put(ComparisonOperator.EQUAL, new EqualExprFunc());\r
+        _opExprFuncMap.put(ComparisonOperator.IS, new EqualExprFunc());\r
 \r
-        _opExprFuncMap.put(ComparisonOperator.NOT_EQUAL,new NotEqualExprFunc());\r
-        _opExprFuncMap.put(ComparisonOperator.IS_NOT,new NotEqualExprFunc());\r
+        _opExprFuncMap.put(ComparisonOperator.NOT_EQUAL, new NotEqualExprFunc());\r
+        _opExprFuncMap.put(ComparisonOperator.IS_NOT, new NotEqualExprFunc());\r
 \r
-        _opExprFuncMap.put(ComparisonOperator.LESS,new LessExprFunc());\r
-        _opExprFuncMap.put(ComparisonOperator.LESS_OR_EQUAL,new LessOrEqualExprFunc());\r
-        _opExprFuncMap.put(ComparisonOperator.GREATER,new GreaterExprFunc());\r
-        _opExprFuncMap.put(ComparisonOperator.GREATER_OR_EQUAL,new GreaterOrEqualExprFunc());\r
+        _opExprFuncMap.put(ComparisonOperator.LESS, new LessExprFunc());\r
+        _opExprFuncMap.put(ComparisonOperator.LESS_OR_EQUAL, new LessOrEqualExprFunc());\r
+        _opExprFuncMap.put(ComparisonOperator.GREATER, new GreaterExprFunc());\r
+        _opExprFuncMap.put(ComparisonOperator.GREATER_OR_EQUAL, new GreaterOrEqualExprFunc());\r
 \r
         // "Life should be much better with functional programming language" - Hao Chen Nov 18th, 2014\r
     }\r
 \r
     private static interface BooleanExprFunc {\r
-        boolean eval(double val1,double val2);\r
+        boolean eval(double val1, double val2);\r
     }\r
 \r
     private static class EqualExprFunc implements BooleanExprFunc {\r
@@ -249,6 +253,7 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
             return val1 == val2;\r
         }\r
     }\r
+\r
     private static class NotEqualExprFunc implements BooleanExprFunc {\r
         @Override\r
         public boolean eval(double val1, double val2) {\r
@@ -262,18 +267,21 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
             return val1 < val2;\r
         }\r
     }\r
+\r
     private static class LessOrEqualExprFunc implements BooleanExprFunc {\r
         @Override\r
         public boolean eval(double val1, double val2) {\r
             return val1 <= val2;\r
         }\r
     }\r
+\r
     private static class GreaterExprFunc implements BooleanExprFunc {\r
         @Override\r
         public boolean eval(double val1, double val2) {\r
             return val1 > val2;\r
         }\r
     }\r
+\r
     private static class GreaterOrEqualExprFunc implements BooleanExprFunc {\r
         @Override\r
         public boolean eval(double val1, double val2) {\r
@@ -281,13 +289,15 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
         }\r
     }\r
 \r
-    public static class ExpressionEvaluationException extends RuntimeException{\r
+    public static class ExpressionEvaluationException extends RuntimeException {\r
         public ExpressionEvaluationException(String message, Throwable cause) {\r
             super(message, cause);\r
         }\r
+\r
         public ExpressionEvaluationException(String message) {\r
             super(message);\r
         }\r
+\r
         public ExpressionEvaluationException(Throwable cause) {\r
             super(cause);\r
         }\r
@@ -295,6 +305,6 @@ public class BooleanExpressionComparator implements WritableComparable<List<KeyV
 \r
     @Override\r
     public String toString() {\r
-        return this.getClass().getSimpleName()+" ("+this.filterEntity.toString()+")";\r
+        return this.getClass().getSimpleName() + " (" + this.filterEntity.toString() + ")";\r
     }\r
-}
\ No newline at end of file
+}\r
index 8209445..9e736ae 100755 (executable)
@@ -34,552 +34,542 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
- * the steps of building hbase filters
- * 1. receive ORExpression from eagle-antlr
- * 2. iterate all ANDExpression in ORExpression
- *    2.1 put each ANDExpression to a new filter list with MUST_PASS_ONE option
- *    2.2 iterate all AtomicExpression in ANDExpression
- *       2.2.1 group AtomicExpression into 2 groups by looking up metadata, one is for tag filters, the other is for column filters
- *       2.2.2 put the above 2 filters to a filter list with MUST_PASS_ALL option
+ * the steps of building hbase filters 1. receive ORExpression from eagle-antlr 2. iterate all ANDExpression
+ * in ORExpression 2.1 put each ANDExpression to a new filter list with MUST_PASS_ONE option 2.2 iterate all
+ * AtomicExpression in ANDExpression 2.2.1 group AtomicExpression into 2 groups by looking up metadata, one is
+ * for tag filters, the other is for column filters 2.2.2 put the above 2 filters to a filter list with
+ * MUST_PASS_ALL option
  */
 public class HBaseFilterBuilder {
-       private static final Logger LOG = LoggerFactory.getLogger(HBaseFilterBuilder.class);
-       
-       /**
-        * syntax is @<fieldname>
-        */
-//     private static final String fnRegex = "^@(.*)$";
-       private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// Pattern.compile(fnRegex);
-       private static final Charset _defaultCharset = Charset.forName("ISO-8859-1");
-
-       private ORExpression _orExpr;
-       private EntityDefinition _ed;
-       private boolean _filterIfMissing;
-       private Charset _charset = _defaultCharset;
-
-       /**
-        * TODO: Verify performance impact
-        *
-        * @return
-        */
-       public Set<String> getFilterFields() {
-               return _filterFields;
-       }
-
-       /**
-        * Just add filter fields for expression filter
-        */
-       private Set<String> _filterFields;
-
-       public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) {
-               this(ed, orExpr, false);
-       }
-       
-       public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, boolean filterIfMissing) {
-               this._ed = ed;
-               this._orExpr = orExpr;
-               this._filterIfMissing = filterIfMissing;
-       }
-       
-       public void setCharset(String charsetName){
-               _charset = Charset.forName(charsetName);
-       }
-       
-       public Charset getCharset(){
-               return _charset;
-       }
-       
-       /**
-        * Because we don't have metadata for tag, we regard non-qualifer field as tag. So one field possibly is not a real tag when this function return true. This happens
-        * when a user input an wrong field name which is neither tag or qualifier
-        *   
-        * @param field
-        */
-       private boolean isTag(String field){
-               return _ed.isTag(field);
-       }
-       
-       /**
-        * check whether this field is one entity attribute or not 
-        * @param fieldName
-        * @return
-        */
-       private String parseEntityAttribute(String fieldName){
-               Matcher m = _fnPattern.matcher(fieldName);
-               if(m.find()){
-                       return m.group(1);
-               }
-               return null;
-       }
-
-       /**
-        * Return the partition values for each or expression. The size of the returned list should be equal to
-        * the size of FilterList that {@link #buildFilters()} returns.
-        * 
-        * TODO: For now we don't support one query to query multiple partitions. In future if partition is defined, 
-        * for the entity, internally We need to spawn multiple queries and send one query for each partition.
-        * 
-        * @return Return the partition values for each or expression. Return null if the entity doesn't support
-        * partition
-        */
-       public List<String[]> getPartitionValues() {
-               final String[] partitions = _ed.getPartitions();
-               if (partitions == null || partitions.length == 0) {
-                       return null;
-               }
-               final List<String[]> result = new ArrayList<String[]>();
-               final Map<String, String> partitionKeyValueMap = new HashMap<String, String>();
-               for(ANDExpression andExpr : _orExpr.getANDExprList()) {
-                       partitionKeyValueMap.clear();
-                       for(AtomicExpression ae : andExpr.getAtomicExprList()) {
-                               // TODO temporarily ignore those fields which are not for attributes
-                               if(ae.getKeyType() == TokenType.ID) {
-                                       final String fieldName = parseEntityAttribute(ae.getKey());
-                                       if (fieldName == null) {
-                                               LOG.warn(fieldName + " field does not have format @<FieldName>, ignored");
-                                               continue;
-                                       }
-                                       if (_ed.isPartitionTag(fieldName) && ComparisonOperator.EQUAL.equals(ae.getOp())) {
-                                               final String value = ae.getValue();
-                                               partitionKeyValueMap.put(fieldName, value);
-                                       }
-                               }
-                       }
-                       final String[] values = new String[partitions.length];
-                       result.add(values);
-                       for (int i = 0; i < partitions.length; ++i) {
-                               final String partition = partitions[i];
-                               final String value = partitionKeyValueMap.get(partition);
-                               values[i] = value;
-                       }
-               }
-               return result;
-       }
-
-       /**
-        * @see org.apache.eagle.query.parser.TokenType
-        *
-        * @return
-        */
-       public FilterList buildFilters(){
-               // TODO: Optimize to select between row filter or column filter for better performance
-               // Use row key filter priority by default
-               boolean rowFilterPriority = true;
-
-               FilterList fltList = new FilterList(Operator.MUST_PASS_ONE);
-               for(ANDExpression andExpr : _orExpr.getANDExprList()){
-                       
-                       FilterList list = new FilterList(Operator.MUST_PASS_ALL);
-                       Map<String, List<String>> tagFilters = new HashMap<String, List<String>>();
-                       List<QualifierFilterEntity> qualifierFilters = new ArrayList<QualifierFilterEntity>();
-//                     List<QualifierFilterEntry> tagLikeQualifierFilters = new ArrayList<QualifierFilterEntry>();
-
-                       // TODO refactor not to use too much if/else
-                       for(AtomicExpression ae : andExpr.getAtomicExprList()){
-                               // TODO temporarily ignore those fields which are not for attributes
-
-                               String fieldName = ae.getKey();
-                               if(ae.getKeyType() == TokenType.ID){
-                                       fieldName = parseEntityAttribute(fieldName);
-                                       if(fieldName == null){
-                                               LOG.warn(fieldName + " field does not have format @<FieldName>, ignored");
-                                               continue;
-                                       }
-                               }
-
-                               String value = ae.getValue();
-                               ComparisonOperator op = ae.getOp();
-                               TokenType keyType = ae.getKeyType();
-                               TokenType valueType = ae.getValueType();
-                               QualifierFilterEntity entry = new QualifierFilterEntity(fieldName,value,op,keyType,valueType);
-
-                               // TODO Exact match, need to add escape for those special characters here, including:
-                               // "-", "[", "]", "/", "{", "}", "(", ")", "*", "+", "?", ".", "\\", "^", "$", "|"
-
-                               if(keyType == TokenType.ID && isTag(fieldName)){
-                                       if ((ComparisonOperator.EQUAL.equals(op) || ComparisonOperator.IS.equals(op))
-                                                       && !TokenType.NULL.equals(valueType))
-                                       {
-                                               // Use RowFilter for equal TAG
-                                               if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>());
-                                               tagFilters.get(fieldName).add(value);
-                                       } else if (rowFilterPriority && ComparisonOperator.IN.equals(op))
-                                       {
-                                               // Use RowFilter here by default
-                                               if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>());
-                                               tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value));
-                                       } else if (ComparisonOperator.LIKE.equals(op)
-                                               || ComparisonOperator.NOT_LIKE.equals(op)
-                                               || ComparisonOperator.CONTAINS.equals(op)
-                                               || ComparisonOperator.NOT_CONTAINS.equals(op)
-                                               || ComparisonOperator.IN.equals(op)
-                                               || ComparisonOperator.IS.equals(op)
-                                               || ComparisonOperator.IS_NOT.equals(op)
-                                               || ComparisonOperator.NOT_EQUAL.equals(op)
-                                               || ComparisonOperator.EQUAL.equals(op)
-                                               || ComparisonOperator.NOT_IN.equals(op))
-                                       {
-                                               qualifierFilters.add(entry);
-                                       } else
-                                       {
-                                               LOG.warn("Don't support operation: \"" + op + "\" on tag field: " + fieldName + " yet, going to ignore");
-                                               throw new IllegalArgumentException("Don't support operation: "+op+" on tag field: "+fieldName+", avaliable options: =, =!, =~, !=~, in, not in, contains, not contains");
-                                       }
-                               }else{
-                                       qualifierFilters.add(entry);
-                               }
-                       }
-
-                       // Build RowFilter for equal tags
-                       list.addFilter(buildTagFilter(tagFilters));
-
-                       // Build SingleColumnValueFilter
-                       FilterList qualifierFilterList = buildQualifierFilter(qualifierFilters);
-                       if(qualifierFilterList != null && qualifierFilterList.getFilters().size()>0){
-                               list.addFilter(qualifierFilterList);
-                       }else {
-                               if(LOG.isDebugEnabled()) LOG.debug("Ignore empty qualifier filter from "+qualifierFilters.toString());
-                       }
-                       fltList.addFilter(list);
-               }
-               LOG.info("Query: " + _orExpr.toString() + " => Filter: " + fltList.toString());
-               return fltList;
-       }
-       
-       /**
-        * _charset is used to decode the byte array, in hbase server, RegexStringComparator uses the same
-        * charset to decode the byte array stored in qualifier
-        * for tag filter regex, it's always ISO-8859-1 as it only comes from String's hashcode (Integer)
-        * Note: regex comparasion is to compare String
-        */
-       protected Filter buildTagFilter(Map<String, List<String>> tagFilters){
-               RegexStringComparator regexStringComparator = new RegexStringComparator(buildTagFilterRegex(tagFilters));
-               regexStringComparator.setCharset(_charset);
-               RowFilter filter = new RowFilter(CompareOp.EQUAL, regexStringComparator);
-               return filter;
-       }
-       
-       /**
-        * all qualifiers' condition must be satisfied.
-        *
-        * <H1>Use RegexStringComparator for:</H1>
-        *      IN
-        *      LIKE
-        *      NOT_LIKE
-        *
-        * <H1>Use SubstringComparator for:</H1>
-        *      CONTAINS
-        *
-        * <H1>Use EntityQualifierHelper for:</H1>
-        *      EQUALS
-        *      NOT_EUQALS
-        *      LESS
-        *      LESS_OR_EQUAL
-        *      GREATER
-        *      GREATER_OR_EQUAL
-        *
-        * <H2>
-        *     TODO: Compare performance of RegexStringComparator ,SubstringComparator ,EntityQualifierHelper
-        * </H2>
-        *
-        * @param qualifierFilters
-        * @return
-        */
-       protected FilterList buildQualifierFilter(List<QualifierFilterEntity> qualifierFilters){
-               FilterList list = new FilterList(Operator.MUST_PASS_ALL);
-               // iterate all the qualifiers
-               for(QualifierFilterEntity entry : qualifierFilters){
-                       // if contains expression based filter
-                       if(entry.getKeyType() == TokenType.EXP
-                                       || entry.getValueType() == TokenType.EXP
-                                       || entry.getKeyType() != TokenType.ID){
-                               if(!EagleConfigFactory.load().isCoprocessorEnabled()) {
-                                       LOG.warn("Expression in filter may not support, because custom filter and coprocessor is disabled: " + entry.toString());
-                               }
-                               list.addFilter(buildExpressionBasedFilter(entry));
-                               continue;
-                       }
-
-                       // else using SingleColumnValueFilter
-                       String qualifierName = entry.getKey();
-                       if(!isTag(entry.getKey())){
-                               Qualifier qualifier = _ed.getDisplayNameMap().get(entry.getKey());
-                               qualifierName = qualifier.getQualifierName();
-                       }
-
-                       // Comparator to be used for building HBase Filter
-                       // WritableByteArrayComparable comparator;
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseFilterBuilder.class);
+
+    /*
+     * syntax is @<fieldname>
+     */
+    // private static final String fnRegex = "^@(.*)$";
+    private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// Pattern.compile(fnRegex);
+    private static final Charset _defaultCharset = Charset.forName("ISO-8859-1");
+
+    private ORExpression orExpr;
+    private EntityDefinition ed;
+    private boolean filterIfMissing;
+    private Charset charset = _defaultCharset;
+
+    /**
+     * TODO: Verify performance impact
+     *
+     * @return
+     */
+    public Set<String> getFilterFields() {
+        return filterFields;
+    }
+
+    /**
+     * Just add filter fields for expression filter
+     */
+    private Set<String> filterFields;
+
+    public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) {
+        this(ed, orExpr, false);
+    }
+
+    public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, boolean filterIfMissing) {
+        this.ed = ed;
+        this.orExpr = orExpr;
+        this.filterIfMissing = filterIfMissing;
+    }
+
+    public void setCharset(String charsetName) {
+        charset = Charset.forName(charsetName);
+    }
+
+    public Charset getCharset() {
+        return charset;
+    }
+
+    /**
+     * Because we don't have metadata for tag, we regard non-qualifer field as tag. So one field possibly is
+     * not a real tag when this function return true. This happens when a user input an wrong field name which
+     * is neither tag or qualifier
+     *
+     * @param field
+     */
+    private boolean isTag(String field) {
+        return ed.isTag(field);
+    }
+
+    /**
+     * check whether this field is one entity attribute or not
+     *
+     * @param fieldName
+     * @return
+     */
+    private String parseEntityAttribute(String fieldName) {
+        Matcher m = _fnPattern.matcher(fieldName);
+        if (m.find()) {
+            return m.group(1);
+        }
+        return null;
+    }
+
+    /**
+     * Return the partition values for each or expression. The size of the returned list should be equal to
+     * the size of FilterList that {@link #buildFilters()} returns. TODO: For now we don't support one query
+     * to query multiple partitions. In future if partition is defined, for the entity, internally We need to
+     * spawn multiple queries and send one query for each partition.
+     *
+     * @return Return the partition values for each or expression. Return null if the entity doesn't support
+     *         partition
+     */
+    public List<String[]> getPartitionValues() {
+        final String[] partitions = ed.getPartitions();
+        if (partitions == null || partitions.length == 0) {
+            return null;
+        }
+        final List<String[]> result = new ArrayList<String[]>();
+        final Map<String, String> partitionKeyValueMap = new HashMap<String, String>();
+        for (ANDExpression andExpr : orExpr.getANDExprList()) {
+            partitionKeyValueMap.clear();
+            for (AtomicExpression ae : andExpr.getAtomicExprList()) {
+                // TODO temporarily ignore those fields which are not for attributes
+                if (ae.getKeyType() == TokenType.ID) {
+                    final String fieldName = parseEntityAttribute(ae.getKey());
+                    if (fieldName == null) {
+                        LOG.warn(fieldName + " field does not have format @<FieldName>, ignored");
+                        continue;
+                    }
+                    if (ed.isPartitionTag(fieldName) && ComparisonOperator.EQUAL.equals(ae.getOp())) {
+                        final String value = ae.getValue();
+                        partitionKeyValueMap.put(fieldName, value);
+                    }
+                }
+            }
+            final String[] values = new String[partitions.length];
+            result.add(values);
+            for (int i = 0; i < partitions.length; ++i) {
+                final String partition = partitions[i];
+                final String value = partitionKeyValueMap.get(partition);
+                values[i] = value;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @see org.apache.eagle.query.parser.TokenType
+     * @return
+     */
+    public FilterList buildFilters() {
+        // TODO: Optimize to select between row filter or column filter for better performance
+        // Use row key filter priority by default
+        boolean rowFilterPriority = true;
+
+        FilterList fltList = new FilterList(Operator.MUST_PASS_ONE);
+        for (ANDExpression andExpr : orExpr.getANDExprList()) {
+
+            FilterList list = new FilterList(Operator.MUST_PASS_ALL);
+            Map<String, List<String>> tagFilters = new HashMap<String, List<String>>();
+            List<QualifierFilterEntity> qualifierFilters = new ArrayList<QualifierFilterEntity>();
+            // List<QualifierFilterEntry> tagLikeQualifierFilters = new ArrayList<QualifierFilterEntry>();
+
+            // TODO refactor not to use too much if/else
+            for (AtomicExpression ae : andExpr.getAtomicExprList()) {
+                // TODO temporarily ignore those fields which are not for attributes
+
+                String fieldName = ae.getKey();
+                if (ae.getKeyType() == TokenType.ID) {
+                    fieldName = parseEntityAttribute(fieldName);
+                    if (fieldName == null) {
+                        LOG.warn(fieldName + " field does not have format @<FieldName>, ignored");
+                        continue;
+                    }
+                }
+
+                String value = ae.getValue();
+                ComparisonOperator op = ae.getOp();
+                TokenType keyType = ae.getKeyType();
+                TokenType valueType = ae.getValueType();
+                QualifierFilterEntity entry = new QualifierFilterEntity(fieldName, value, op, keyType,
+                                                                        valueType);
+
+                // TODO Exact match, need to add escape for those special characters here, including:
+                // "-", "[", "]", "/", "{", "}", "(", ")", "*", "+", "?", ".", "\\", "^", "$", "|"
+
+                if (keyType == TokenType.ID && isTag(fieldName)) {
+                    if ((ComparisonOperator.EQUAL.equals(op) || ComparisonOperator.IS.equals(op))
+                        && !TokenType.NULL.equals(valueType)) {
+                        // Use RowFilter for equal TAG
+                        if (tagFilters.get(fieldName) == null) {
+                            tagFilters.put(fieldName, new ArrayList<String>());
+                        }
+                        tagFilters.get(fieldName).add(value);
+                    } else if (rowFilterPriority && ComparisonOperator.IN.equals(op)) {
+                        // Use RowFilter here by default
+                        if (tagFilters.get(fieldName) == null) {
+                            tagFilters.put(fieldName, new ArrayList<String>());
+                        }
+                        tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value));
+                    } else if (ComparisonOperator.LIKE.equals(op) || ComparisonOperator.NOT_LIKE.equals(op)
+                               || ComparisonOperator.CONTAINS.equals(op)
+                               || ComparisonOperator.NOT_CONTAINS.equals(op)
+                               || ComparisonOperator.IN.equals(op) || ComparisonOperator.IS.equals(op)
+                               || ComparisonOperator.IS_NOT.equals(op)
+                               || ComparisonOperator.NOT_EQUAL.equals(op)
+                               || ComparisonOperator.EQUAL.equals(op)
+                               || ComparisonOperator.NOT_IN.equals(op)) {
+                        qualifierFilters.add(entry);
+                    } else {
+                        LOG.warn("Don't support operation: \"" + op + "\" on tag field: " + fieldName
+                                 + " yet, going to ignore");
+                        throw new IllegalArgumentException("Don't support operation: " + op
+                                                           + " on tag field: " + fieldName
+                                                           + ", avaliable options: =, =!, =~, !=~, in, not in, contains, not contains");
+                    }
+                } else {
+                    qualifierFilters.add(entry);
+                }
+            }
+
+            // Build RowFilter for equal tags
+            list.addFilter(buildTagFilter(tagFilters));
+
+            // Build SingleColumnValueFilter
+            FilterList qualifierFilterList = buildQualifierFilter(qualifierFilters);
+            if (qualifierFilterList != null && qualifierFilterList.getFilters().size() > 0) {
+                list.addFilter(qualifierFilterList);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignore empty qualifier filter from " + qualifierFilters.toString());
+                }
+            }
+            fltList.addFilter(list);
+        }
+        LOG.info("Query: " + orExpr.toString() + " => Filter: " + fltList.toString());
+        return fltList;
+    }
+
+    /**
+     * charset is used to decode the byte array, in hbase server, RegexStringComparator uses the same charset
+     * to decode the byte array stored in qualifier for tag filter regex, it's always ISO-8859-1 as it only
+     * comes from String's hashcode (Integer) Note: regex comparasion is to compare String
+     */
+    protected Filter buildTagFilter(Map<String, List<String>> tagFilters) {
+        RegexStringComparator regexStringComparator = new RegexStringComparator(buildTagFilterRegex(tagFilters));
+        regexStringComparator.setCharset(charset);
+        RowFilter filter = new RowFilter(CompareOp.EQUAL, regexStringComparator);
+        return filter;
+    }
+
+    /**
+     * all qualifiers' condition must be satisfied.
+     * <H1>Use RegexStringComparator for:</H1> IN LIKE NOT_LIKE
+     * <H1>Use SubstringComparator for:</H1> CONTAINS
+     * <H1>Use EntityQualifierHelper for:</H1> EQUALS NOT_EUQALS LESS LESS_OR_EQUAL GREATER GREATER_OR_EQUAL
+     * <H2>TODO: Compare performance of RegexStringComparator ,SubstringComparator ,EntityQualifierHelper</H2>
+     *
+     * @param qualifierFilters
+     * @return
+     */
+    protected FilterList buildQualifierFilter(List<QualifierFilterEntity> qualifierFilters) {
+        FilterList list = new FilterList(Operator.MUST_PASS_ALL);
+        // iterate all the qualifiers
+        for (QualifierFilterEntity entry : qualifierFilters) {
+            // if contains expression based filter
+            if (entry.getKeyType() == TokenType.EXP || entry.getValueType() == TokenType.EXP
+                || entry.getKeyType() != TokenType.ID) {
+                if (!EagleConfigFactory.load().isCoprocessorEnabled()) {
+                    LOG.warn("Expression in filter may not support, because custom filter and coprocessor is disabled: "
+                             + entry.toString());
+                }
+                list.addFilter(buildExpressionBasedFilter(entry));
+                continue;
+            }
+
+            // else using SingleColumnValueFilter
+            String qualifierName = entry.getKey();
+            if (!isTag(entry.getKey())) {
+                Qualifier qualifier = ed.getDisplayNameMap().get(entry.getKey());
+                qualifierName = qualifier.getQualifierName();
+            }
+
+            // Comparator to be used for building HBase Filter
+            // WritableByteArrayComparable comparator;
             ByteArrayComparable comparable;
-                       if(ComparisonOperator.IN.equals(entry.getOp())
-                               || ComparisonOperator.NOT_IN.equals(entry.getOp())){
-                               Filter setFilter = buildListQualifierFilter(entry);
-                               if(setFilter!=null){
-                                       list.addFilter(setFilter);
-                               }
-                       }else{
-                               // If [=,!=,is,is not] NULL, use NullComparator else throw exception
-                               if(TokenType.NULL.equals(entry.getValueType())){
-                                       if(ComparisonOperator.EQUAL.equals(entry.getOp())
-                                               ||ComparisonOperator.NOT_EQUAL.equals(entry.getOp())
-                                               ||ComparisonOperator.IS.equals(entry.getOp())
-                                               ||ComparisonOperator.IS_NOT.equals(entry.getOp()))
+            if (ComparisonOperator.IN.equals(entry.getOp())
+                || ComparisonOperator.NOT_IN.equals(entry.getOp())) {
+                Filter setFilter = buildListQualifierFilter(entry);
+                if (setFilter != null) {
+                    list.addFilter(setFilter);
+                }
+            } else {
+                // If [=,!=,is,is not] NULL, use NullComparator else throw exception
+                if (TokenType.NULL.equals(entry.getValueType())) {
+                    if (ComparisonOperator.EQUAL.equals(entry.getOp())
+                        || ComparisonOperator.NOT_EQUAL.equals(entry.getOp())
+                        || ComparisonOperator.IS.equals(entry.getOp())
+                        || ComparisonOperator.IS_NOT.equals(entry.getOp())) {
                         comparable = new NullComparator();
-                                       else
-                                               throw new IllegalArgumentException("Operation: "+entry.getOp()+" with NULL is not supported yet: "+entry.toString()+", avaliable options: [=, !=, is, is not] null|NULL");
-                               }
-                               // If [contains, not contains],use SubstringComparator
-                               else if (ComparisonOperator.CONTAINS.equals(entry.getOp())
-                                       || ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) {
+                    } else {
+                        throw new IllegalArgumentException("Operation: " + entry.getOp()
+                                                           + " with NULL is not supported yet: "
+                                                           + entry.toString()
+                                                           + ", avaliable options: [=, !=, is, is not] null|NULL");
+                    }
+                } else if (ComparisonOperator.CONTAINS.equals(entry.getOp())
+                         || ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) {
+                    // If [contains, not contains],use SubstringComparator
                     comparable = new SubstringComparator(entry.getValue());
-                               }
-                               // If [like, not like], use RegexStringComparator
-                               else if (ComparisonOperator.LIKE.equals(entry.getOp())
-                                               || ComparisonOperator.NOT_LIKE.equals(entry.getOp())){
-                                       // Use RegexStringComparator for LIKE / NOT_LIKE
-                                       RegexStringComparator _comparator = new RegexStringComparator(buildQualifierRegex(entry.getValue()));
-                                       _comparator.setCharset(_charset);
+                } else if (ComparisonOperator.LIKE.equals(entry.getOp())
+                         || ComparisonOperator.NOT_LIKE.equals(entry.getOp())) {
+                    // If [like, not like], use RegexStringComparator
+                    // Use RegexStringComparator for LIKE / NOT_LIKE
+                    RegexStringComparator _comparator = new RegexStringComparator(buildQualifierRegex(entry
+                        .getValue()));
+                    _comparator.setCharset(charset);
                     comparable = _comparator;
-                               } else{
-                                       Class type = EntityQualifierUtils.getType(_ed, entry.getKey());
-                                       // if type is null (is Tag or not found) or not defined for TypedByteArrayComparator
-                                       if(!EagleConfigFactory.load().isCoprocessorEnabled() || type == null || TypedByteArrayComparator.get(type) == null){
-                        comparable = new BinaryComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue()));
-                                       }else {
-                        comparable = new TypedByteArrayComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue()),type);
-                                       }
-                               }
-
-                               SingleColumnValueFilter filter =
-                                               new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparable);
-                               filter.setFilterIfMissing(_filterIfMissing);
-                               list.addFilter(filter);
-                       }
-               }
-
-               return list;
-       }
-
-       private Filter buildExpressionBasedFilter(QualifierFilterEntity entry) {
-               BooleanExpressionComparator expressionComparator  = new BooleanExpressionComparator(entry,_ed);
-               _filterFields = expressionComparator.getRequiredFields();
-               RowValueFilter filter = new RowValueFilter(expressionComparator);
-               return filter;
-       }
-
-       /**
-        * Currently use BinaryComparator only
-        * <h2>TODO: </h2>
-        * Possibility to tune performance by using: OR[BinaryComparator,...] instead of RegexStringComparator?
-        *
-        *<br/> <br/>
-        *
-        * ! Check op must be IN or NOTIN in caller
-    &nbs