[EAGLE-1080] Fix checkstyle errors in the eagle-query-base module
authorColm O hEigeartaigh <coheigea@apache.org>
Wed, 7 Feb 2018 07:05:37 +0000 (23:05 -0800)
committerJay Sen <jsenjaliya@paypal.com>
Wed, 7 Feb 2018 07:05:37 +0000 (23:05 -0800)
<!--
{% comment %}
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[EAGLE-<Jira issue #>] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
       Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `<Jira issue #>` in the title with the actual Jira issue
       number, if there is one.
 - [ ] If this contribution is large, please file an Apache
       [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt).

---

Author: Colm O hEigeartaigh <coheigea@apache.org>

Closes #983 from coheigea/eagle-query-base-checkstyle.

63 files changed:
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericEntityQuery.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericQuery.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/ListQueryCompiler.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/QueryConstants.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntity.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateAPIEntityFactory.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateCondition.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionNotSupportedException.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionType.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateFunctionTypeMatcher.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParams.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateParamsValidator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/AggregateResultAPIEntity.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/Aggregator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/BucketQuery.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/TestHBaseLogReader2.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/TestListQueryCompiler.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/raw/TestGroupbyKey.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/raw/TestRawAggregator.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/raw/TestRawHBaseLogReaderAndAgg.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestAggregator.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestAlertAggService.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestBucketQuery.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestBucketQuery2.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestFlatAggregator.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestGroupbyFieldComparator.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestHierarchicalAggregator.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestListQueryCompiler.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestPostFlatAggregateSort.java
eagle-core/eagle-query/eagle-query-base/src/test/java/org/apache/eagle/query/aggregate/test/TestTimeSeriesAggregator.java
eagle-dev/checkstyle-suppressions.xml

index 1319f43..e6e56d6 100755 (executable)
@@ -25,50 +25,56 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * @since : 10/30/14,2014
+ * @since : 10/30/14,2014.
  */
 public class GenericEntityQuery implements GenericQuery,EntityCreationListener {
-       private static final Logger LOG = LoggerFactory.getLogger(GenericEntityQuery.class);
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityQuery.class);
 
-       private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
-       private StreamReader reader;
+    private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+    private StreamReader reader;
 
-       public GenericEntityQuery(String serviceName, SearchCondition condition, String metricName) throws IllegalAccessException, InstantiationException {
-               if(serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){
-                       if(LOG.isDebugEnabled()) LOG.debug("List metric query");
-                       if(metricName == null || metricName.isEmpty()){
-                               throw new IllegalArgumentException("metricName should not be empty for metric list query");
-                       }
-                       if(!condition.getOutputFields().contains(GenericMetricEntity.VALUE_FIELD)){
-                               condition.getOutputFields().add(GenericMetricEntity.VALUE_FIELD);
-                       }
-                       reader = new GenericEntityStreamReader(serviceName, condition,metricName);
-               }else{
-                       if(LOG.isDebugEnabled()) LOG.debug("List entity query");
-                       reader = new GenericEntityStreamReader(serviceName, condition);
-               }
-               reader.register(this);
-       }
+    public GenericEntityQuery(String serviceName, SearchCondition condition, String metricName) throws IllegalAccessException, InstantiationException {
+        if (serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("List metric query");
+            }
+            if (metricName == null || metricName.isEmpty()) {
+                throw new IllegalArgumentException("metricName should not be empty for metric list query");
+            }
+            if (!condition.getOutputFields().contains(GenericMetricEntity.VALUE_FIELD)) {
+                condition.getOutputFields().add(GenericMetricEntity.VALUE_FIELD);
+            }
+            reader = new GenericEntityStreamReader(serviceName, condition,metricName);
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("List entity query");
+            }
+            reader = new GenericEntityStreamReader(serviceName, condition);
+        }
+        reader.register(this);
+    }
 
-       @Override
-       public long getLastTimestamp() {
-               return reader.getLastTimestamp();
-       }
+    @Override
+    public long getLastTimestamp() {
+        return reader.getLastTimestamp();
+    }
 
-       @Override
-       public void entityCreated(TaggedLogAPIEntity entity){
-               entities.add(entity);
-       }
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) {
+        entities.add(entity);
+    }
 
-       @Override
-       public List<TaggedLogAPIEntity> result() throws Exception{
-               if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode");
-               reader.readAsStream();
-               return entities;
-       }
+    @Override
+    public List<TaggedLogAPIEntity> result() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Start reading as batch mode");
+        }
+        reader.readAsStream();
+        return entities;
+    }
 
-       @Override
-       public long getFirstTimeStamp() {
-               return reader.getFirstTimestamp();
-       }
+    @Override
+    public long getFirstTimeStamp() {
+        return reader.getFirstTimestamp();
+    }
 }
\ No newline at end of file
index d3af151..698de4e 100755 (executable)
@@ -19,28 +19,28 @@ package org.apache.eagle.query;
 import java.util.List;
 
 /**
- * @since : 10/30/14,2014
+ * @since : 10/30/14,2014.
  */
 public interface GenericQuery {
-       /**
-        * Throw all exceptions to http server
-        *
+    /**
+     * Throw all exceptions to http server.
+     *
      * @param <T> result entity type
-        * @return result entities list
-        *
+     * @return result entities list
+     *
      * @throws Exception
-        */
-       <T> List<T> result() throws Exception;
+     */
+    <T> List<T> result() throws Exception;
 
-       /**
-        * Get last/largest timestamp on all rows
-        *
-        * @return last timestamp
-        */
-       long getLastTimestamp();
+    /**
+     * Get last/largest timestamp on all rows.
+     *
+     * @return last timestamp
+     */
+    long getLastTimestamp();
 
-       /**
-        * Get first timestamp on all rows
-        */
-       long getFirstTimeStamp();
+    /**
+     * Get first timestamp on all rows.
+     */
+    long getFirstTimeStamp();
 }
\ No newline at end of file
index ab01064..b83818b 100755 (executable)
@@ -38,343 +38,364 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class ListQueryCompiler {
-       private final static Logger LOG = LoggerFactory.getLogger(ListQueryCompiler.class);
-       /**
-        * syntax is <EntityName>[<Filter>]{<Projection>}
-        */
-       private final static String listRegex = "^([^\\[]+)\\[(.*)\\]\\{(.+)\\}$";
-       private final static Pattern _listPattern = Pattern.compile(listRegex);
-
-       /**
-        * syntax is @<fieldname>
-        */
-       private final static String _fnAnyPattern = "*";
-       private final static Pattern _fnPattern = TokenConstant.ID_PATTERN;
-
-       /**
-        * syntax is @<expression>
-        */
-       private final static String expRegex = "^(EXP\\{.*\\})(\\s+AS)?(\\s+.*)?$";
-       private final static Pattern _expPattern = Pattern.compile(expRegex,Pattern.CASE_INSENSITIVE);
-
-       /**
-        * syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}
-        */
-
-       /** The regular expression before add EXP{<Expression>} in query **/
-       private final static String aggRegex = "^([^\\[]+)\\[(.*)\\]<([^>]*)>\\{(.+)\\}$";
-       private final static Pattern _aggPattern = Pattern.compile(aggRegex);
-
-       private final static String sortRegex = "^([^\\[]+)\\[(.*)\\]<([^>]*)>\\{(.+)\\}\\.\\{(.+)\\}$";
-       private final static Pattern _sortPattern = Pattern.compile(sortRegex);
-       
-       private String _serviceName;
-       private Filter _filter;
-       private List<String> _outputFields;
-       private List<String> _groupbyFields;
-       private List<AggregateFunctionType> _aggregateFunctionTypes;
-       private List<String> _aggregateFields;
-       private List<AggregateFunctionType> _sortFunctionTypes;
-       private List<String> _sortFields;
-       private Map<String,String> _outputAlias;
-
-       /**
-        * Filed that must be required in filter
-        *
-        * @return
-        */
-       public Set<String> getFilterFields() {
-               return _filterFields;
-       }
-
-       private Set<String> _filterFields;
-       private List<SortOption> _sortOptions;
-       private boolean _hasAgg;
-       private List<String[]> _partitionValues;
-       private boolean _filterIfMissing;
-       private ORExpression _queryExpression;
-       private boolean _outputAll = false;
-
-       public ListQueryCompiler(String query) throws Exception {
-               this(query, false);
-       }
-       
-       public ListQueryCompiler(String query, boolean filterIfMissing) throws Exception{
-               this._filterIfMissing = filterIfMissing;
-               Matcher m = _listPattern.matcher(query);
-               if(m.find()){
-                       if(m.groupCount() != 3)
-                               throw new IllegalArgumentException("List query syntax is <EntityName>[<Filter>]{<Projection>}");
-                       compileCollectionQuery(m);
-                       _hasAgg = false;
-                       partitionConstraintValidate(query);
-                       return;
-               }
-               
-               /** match sort pattern fist, otherwise some sort query will be mismatch as agg pattern */
-               m = _sortPattern.matcher(query);
-               if(m.find()){
-                       if(m.groupCount() != 5)
-                               throw new IllegalArgumentException("Aggregate query syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}.{<SortOptions>}");
-                       compileAggregateQuery(m);
-                       _hasAgg = true;
-                       partitionConstraintValidate(query);
-                       return;
-               }
-               
-               m = _aggPattern.matcher(query);
-               if(m.find()){
-                       if(m.groupCount() != 4)
-                       //if(m.groupCount() < 4 || m.groupCount() > 5)
-                               throw new IllegalArgumentException("Aggregate query syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}.{<SortOptions>}");
-                       compileAggregateQuery(m);
-                       _hasAgg = true;
-                       partitionConstraintValidate(query);
-                       return;
-               }
-               
-               throw new IllegalArgumentException("List query syntax is <EntityName>[<Filter>]{<Projection>} \n Aggregate query syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}.{<SortOptions>}");
-       }
-       
-       /**
-        * TODO: For now we don't support one query to query multiple partitions. In future if partition is defined 
-        * for the entity, internally We need to spawn multiple queries and send one query for each search condition 
-        * for each partition
-        * 
-        * @param query input query to compile
-        */
-       private void partitionConstraintValidate(String query) {
-               if (_partitionValues != null && _partitionValues.size() > 1) {
-                       final String[] values = _partitionValues.get(0);
-                       for (int i = 1; i < _partitionValues.size(); ++i) {
-                               final String[] tmpValues = _partitionValues.get(i);
-                               for (int j = 0; j < values.length; ++j) {
-                                       if (values[j] == null || (!values[j].equals(tmpValues[j]))) {
-                                               final String errMsg = "One query for multiple partitions is NOT allowed for now! Query: " + query;
-                                               LOG.error(errMsg);
-                                               throw new IllegalArgumentException(errMsg);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       public boolean hasAgg(){
-               return _hasAgg;
-       }
-       
-       public List<String[]> getQueryPartitionValues() {
-               return _partitionValues;
-       }
-       
-       public ORExpression getQueryExpression() {
-               return _queryExpression;
-       }
-       
-       private void checkEntityExistence(String entityName) throws EagleQueryParseException {
-               try {
-                       if(EntityDefinitionManager.getEntityByServiceName(entityName) == null)
-                               throw new EagleQueryParseException(entityName + " entity does not exist!");
-               } catch (InstantiationException e) {
-                       final String errMsg = "Got an InstantiationException: " + e.getMessage();
-                       throw new EagleQueryParseException(entityName + " entity does not exist! " + errMsg);
-               } catch (IllegalAccessException e) {
-                       final String errMsg = "Got an IllegalAccessException: " + e.getMessage();
-                       throw new EagleQueryParseException(entityName + " entity does not exist! " + errMsg);
-               }
-       }
-       
-       public String deleteAtSign(String expression) {
-               return expression.replace("@", "");
-       }
-       
-       private void compileCollectionQuery(Matcher m) throws EagleQueryParseException{
-               _serviceName = m.group(1);
-               checkEntityExistence(_serviceName);
-               if(_outputFields==null) _outputFields = new ArrayList<String>();
-               String qy = m.group(2);
-               _filter = compileQy(qy);
-               String prjFields = m.group(3);
-               String[] tmp = prjFields.split(",");
-               for(String str : tmp){
-                       str = str.trim();
-                       Matcher fnMatcher = _fnPattern.matcher(str);
-                       Matcher expMatcher = _expPattern.matcher(str);
-                       if(fnMatcher.find()) {
-                               if (fnMatcher.groupCount() == 1)
-                                       _outputFields.add(fnMatcher.group(1));                          
-                       }else if(_fnAnyPattern.equals(str)){
-                               if(LOG.isDebugEnabled()) LOG.debug("Output all fields");
-                               // _outputFields.add(_fnAnyPattern);
-                               this._outputAll = true;
-                       }else if (expMatcher.find()) {
-                               String expr = deleteAtSign(expMatcher.group(1));
-                               String alias = expMatcher.group(3);
-                               try {
-                                       String exprContent = TokenConstant.parseExpressionContent(expr);
-                                       _outputFields.addAll(ExpressionParser.parse(exprContent).getDependentFields());
-                                       if(alias!=null) {
-                                               if(_outputAlias == null) _outputAlias = new HashMap<String, String>();
-                                               _outputAlias.put(exprContent,alias.trim());
-                                       }
-                               } catch (Exception ex){
-                                       LOG.error("Failed to parse expression: " + expr + ", exception: " + ex.getMessage(), ex);
-                               } finally {
-                                       _outputFields.add(expr);
-                               }
-                       } else {
-                               throw new IllegalArgumentException("Field name syntax must be @<FieldName> or * or Expression in syntax EXP{<Expression>}");
-                       }
-               }
-       }
-       
-       private void compileAggregateQuery(Matcher m) throws EagleQueryParseException{
-               _serviceName = m.group(1);
-               checkEntityExistence(_serviceName);
-               String qy = m.group(2);
-               _filter = compileQy(qy);
-               String groupbyFields = m.group(3);
-               // groupbyFields could be empty
-               List<String> groupbyFieldList = null;
-               _groupbyFields = new ArrayList<String>();
-               if(!groupbyFields.isEmpty()){
-                       groupbyFieldList = Arrays.asList(groupbyFields.split(","));
-                       for(String str : groupbyFieldList){
-                               Matcher fnMatcher = _fnPattern.matcher(str.trim());
-                               if(!fnMatcher.find() || fnMatcher.groupCount() != 1)
-                                       throw new IllegalArgumentException("Field name syntax must be @<FieldName>");
-                               _groupbyFields.add(fnMatcher.group(1));
-                       }
-               }
-               String functions = m.group(4);
-               // functions
-               List<String> functionList = Arrays.asList(functions.split(","));
-               _aggregateFunctionTypes = new ArrayList<AggregateFunctionType>();
-               _aggregateFields = new ArrayList<String>();
-               for(String function : functionList){
-                       AggregateFunctionTypeMatcher matcher = AggregateFunctionType.matchAll(function.trim());
-                       if(!matcher.find()){
-                               throw new IllegalArgumentException("Aggregate function must have format of count|sum|avg|max|min(<fieldname|expression>)");
-                       }
-                       _aggregateFunctionTypes.add(matcher.type());
-                       String aggField = deleteAtSign(matcher.field().trim());
-                       try {
-                               if(_outputFields == null) _outputFields = new ArrayList<String>();
-                               if(TokenConstant.isExpression(aggField)) {
-                                       _outputFields.addAll(ExpressionParser.parse(TokenConstant.parseExpressionContent(aggField)).getDependentFields());
-                               }else{
-                                       _outputFields.add(aggField);
-                               }
-                       } catch (Exception ex){
-                               LOG.error("Failed to parse expression: " + aggField + ", exception: " + ex.getMessage(), ex);
-                       } finally {
-                               _aggregateFields.add(aggField);
-                       }
-               }
-               
-               // sort options
-               if(m.groupCount() < 5 || m.group(5) == null) // no sort options
-                       return;
-               String sortOptions = m.group(5);
-               if(sortOptions != null){
-                       LOG.info("SortOptions: " + sortOptions);
-                       List<String> sortOptionList = Arrays.asList(sortOptions.split(","));
-                       List<String> rawSortFields = new ArrayList<String>();
-                       this._sortOptions = SortOptionsParser.parse(groupbyFieldList, functionList, sortOptionList, rawSortFields);
-                       this._sortFunctionTypes = new ArrayList<>();
-                       this._sortFields = new ArrayList<>();
-                       for (String sortField : rawSortFields) {
-                               AggregateFunctionTypeMatcher matcher = AggregateFunctionType.matchAll(sortField);
-                               if(matcher.find()) {
-                                       _sortFunctionTypes.add(matcher.type());
-                                       _sortFields.add(deleteAtSign(matcher.field().trim()));
-                               }
-                       }
-               }
-       }
-       
-       /**
-        * 1. syntax level - use antlr to pass the queries
-        * 2. semantics level - can't distinguish tag or qualifier
-        * @param qy
-        * @return
-        */
-       private Filter compileQy(String qy) throws EagleQueryParseException{
-               try {
-                       EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(_serviceName);
-                       if(qy == null || qy.isEmpty()){
-                               if (ed.getPartitions() == null) {
-                                       if(LOG.isDebugEnabled()) LOG.warn("Query string is empty, full table scan query: " + qy);
-                                       // For hbase 0.98+, empty FilterList() will filter all rows, so we need return null instead
+    private static final Logger LOG = LoggerFactory.getLogger(ListQueryCompiler.class);
+    /*
+     * syntax is <EntityName>[<Filter>]{<Projection>}
+     */
+    private static final String listRegex = "^([^\\[]+)\\[(.*)\\]\\{(.+)\\}$";
+    private static final Pattern listPattern = Pattern.compile(listRegex);
+
+    /*
+     * syntax is @<fieldname>
+     */
+    private static final String fnAnyPattern = "*";
+    private static final Pattern fnPattern = TokenConstant.ID_PATTERN;
+
+    /*
+     * syntax is @<expression>
+     */
+    private static final String expRegex = "^(EXP\\{.*\\})(\\s+AS)?(\\s+.*)?$";
+    private static final Pattern expPattern = Pattern.compile(expRegex,Pattern.CASE_INSENSITIVE);
+
+    /*
+     * syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}
+     */
+
+    /** The regular expression before add EXP{<Expression>} in query **/
+    private static final String aggRegex = "^([^\\[]+)\\[(.*)\\]<([^>]*)>\\{(.+)\\}$";
+    private static final Pattern aggPattern = Pattern.compile(aggRegex);
+
+    private static final String sortRegex = "^([^\\[]+)\\[(.*)\\]<([^>]*)>\\{(.+)\\}\\.\\{(.+)\\}$";
+    private static final Pattern sortPattern = Pattern.compile(sortRegex);
+
+    private String serviceName;
+    private Filter filter;
+    private List<String> outputFields;
+    private List<String> groupbyFields;
+    private List<AggregateFunctionType> aggregateFunctionTypes;
+    private List<String> aggregateFields;
+    private List<AggregateFunctionType> sortFunctionTypes;
+    private List<String> sortFields;
+    private Map<String,String> outputAlias;
+
+    /**
+     * Filed that must be required in filter
+     *
+     * @return
+     */
+    public Set<String> getFilterFields() {
+        return filterFields;
+    }
+
+    private Set<String> filterFields;
+    private List<SortOption> sortOptions;
+    private boolean hasAgg;
+    private List<String[]> partitionValues;
+    private boolean filterIfMissing;
+    private ORExpression queryExpression;
+    private boolean outputAll = false;
+
+    public ListQueryCompiler(String query) throws Exception {
+        this(query, false);
+    }
+
+    public ListQueryCompiler(String query, boolean filterIfMissing) throws Exception {
+        this.filterIfMissing = filterIfMissing;
+        Matcher m = listPattern.matcher(query);
+        if (m.find()) {
+            if (m.groupCount() != 3) {
+                throw new IllegalArgumentException("List query syntax is <EntityName>[<Filter>]{<Projection>}");
+            }
+            compileCollectionQuery(m);
+            hasAgg = false;
+            partitionConstraintValidate(query);
+            return;
+        }
+
+        /** match sort pattern fist, otherwise some sort query will be mismatch as agg pattern */
+        m = sortPattern.matcher(query);
+        if (m.find()) {
+            if (m.groupCount() != 5) {
+                throw new IllegalArgumentException("Aggregate query syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}.{<SortOptions>}");
+            }
+            compileAggregateQuery(m);
+            hasAgg = true;
+            partitionConstraintValidate(query);
+            return;
+        }
+
+        m = aggPattern.matcher(query);
+        if (m.find()) {
+            if (m.groupCount() != 4) {
+                //if(m.groupCount() < 4 || m.groupCount() > 5)
+                throw new IllegalArgumentException("Aggregate query syntax is <EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}.{<SortOptions>}");
+            }
+            compileAggregateQuery(m);
+            hasAgg = true;
+            partitionConstraintValidate(query);
+            return;
+        }
+
+        throw new IllegalArgumentException("List query syntax is <EntityName>[<Filter>]{<Projection>} \n Aggregate query syntax is "
+                                           + "<EntityName>[<Filter>]<GroupbyFields>{<AggregateFunctions>}.{<SortOptions>}");
+    }
+
+    /**
+     * TODO: For now we don't support one query to query multiple partitions. In future if partition is defined
+     * for the entity, internally We need to spawn multiple queries and send one query for each search condition
+     * for each partition
+     *
+     * @param query input query to compile
+     */
+    private void partitionConstraintValidate(String query) {
+        if (partitionValues != null && partitionValues.size() > 1) {
+            final String[] values = partitionValues.get(0);
+            for (int i = 1; i < partitionValues.size(); ++i) {
+                final String[] tmpValues = partitionValues.get(i);
+                for (int j = 0; j < values.length; ++j) {
+                    if (values[j] == null || (!values[j].equals(tmpValues[j]))) {
+                        final String errMsg = "One query for multiple partitions is NOT allowed for now! Query: " + query;
+                        LOG.error(errMsg);
+                        throw new IllegalArgumentException(errMsg);
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean hasAgg() {
+        return hasAgg;
+    }
+
+    public List<String[]> getQueryPartitionValues() {
+        return partitionValues;
+    }
+
+    public ORExpression getQueryExpression() {
+        return queryExpression;
+    }
+
+    private void checkEntityExistence(String entityName) throws EagleQueryParseException {
+        try {
+            if (EntityDefinitionManager.getEntityByServiceName(entityName) == null) {
+                throw new EagleQueryParseException(entityName + " entity does not exist!");
+            }
+        } catch (InstantiationException e) {
+            final String errMsg = "Got an InstantiationException: " + e.getMessage();
+            throw new EagleQueryParseException(entityName + " entity does not exist! " + errMsg);
+        } catch (IllegalAccessException e) {
+            final String errMsg = "Got an IllegalAccessException: " + e.getMessage();
+            throw new EagleQueryParseException(entityName + " entity does not exist! " + errMsg);
+        }
+    }
+
+    public String deleteAtSign(String expression) {
+        return expression.replace("@", "");
+    }
+
+    private void compileCollectionQuery(Matcher m) throws EagleQueryParseException {
+        serviceName = m.group(1);
+        checkEntityExistence(serviceName);
+        if (outputFields == null) {
+            outputFields = new ArrayList<String>();
+        }
+        String qy = m.group(2);
+        filter = compileQy(qy);
+        String prjFields = m.group(3);
+        String[] tmp = prjFields.split(",");
+        for (String str : tmp) {
+            str = str.trim();
+            Matcher fnMatcher = fnPattern.matcher(str);
+            Matcher expMatcher = expPattern.matcher(str);
+            if (fnMatcher.find()) {
+                if (fnMatcher.groupCount() == 1) {
+                    outputFields.add(fnMatcher.group(1));
+                }
+            } else if (fnAnyPattern.equals(str)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Output all fields");
+                }
+                // outputFields.add(fnAnyPattern);
+                this.outputAll = true;
+            } else if (expMatcher.find()) {
+                String expr = deleteAtSign(expMatcher.group(1));
+                String alias = expMatcher.group(3);
+                try {
+                    String exprContent = TokenConstant.parseExpressionContent(expr);
+                    outputFields.addAll(ExpressionParser.parse(exprContent).getDependentFields());
+                    if (alias != null) {
+                        if (outputAlias == null) {
+                            outputAlias = new HashMap<String, String>();
+                        }
+                        outputAlias.put(exprContent,alias.trim());
+                    }
+                } catch (Exception ex) {
+                    LOG.error("Failed to parse expression: " + expr + ", exception: " + ex.getMessage(), ex);
+                } finally {
+                    outputFields.add(expr);
+                }
+            } else {
+                throw new IllegalArgumentException("Field name syntax must be @<FieldName> or * or Expression in syntax EXP{<Expression>}");
+            }
+        }
+    }
+
+    private void compileAggregateQuery(Matcher m) throws EagleQueryParseException {
+        serviceName = m.group(1);
+        checkEntityExistence(serviceName);
+        String qy = m.group(2);
+        filter = compileQy(qy);
+        String groupbyField = m.group(3);
+        // groupbyFields could be empty
+        List<String> groupbyFieldList = null;
+        groupbyFields = new ArrayList<String>();
+        if (!groupbyField.isEmpty()) {
+            groupbyFieldList = Arrays.asList(groupbyField.split(","));
+            for (String str : groupbyFieldList) {
+                Matcher fnMatcher = fnPattern.matcher(str.trim());
+                if (!fnMatcher.find() || fnMatcher.groupCount() != 1) {
+                    throw new IllegalArgumentException("Field name syntax must be @<FieldName>");
+                }
+                groupbyFields.add(fnMatcher.group(1));
+            }
+        }
+        String functions = m.group(4);
+        // functions
+        List<String> functionList = Arrays.asList(functions.split(","));
+        aggregateFunctionTypes = new ArrayList<AggregateFunctionType>();
+        aggregateFields = new ArrayList<String>();
+        for (String function : functionList) {
+            AggregateFunctionTypeMatcher matcher = AggregateFunctionType.matchAll(function.trim());
+            if (!matcher.find()) {
+                throw new IllegalArgumentException("Aggregate function must have format of count|sum|avg|max|min(<fieldname|expression>)");
+            }
+            aggregateFunctionTypes.add(matcher.type());
+            String aggField = deleteAtSign(matcher.field().trim());
+            try {
+                if (outputFields == null) {
+                    outputFields = new ArrayList<String>();
+                }
+                if (TokenConstant.isExpression(aggField)) {
+                    outputFields.addAll(ExpressionParser.parse(TokenConstant.parseExpressionContent(aggField)).getDependentFields());
+                } else {
+                    outputFields.add(aggField);
+                }
+            } catch (Exception ex) {
+                LOG.error("Failed to parse expression: " + aggField + ", exception: " + ex.getMessage(), ex);
+            } finally {
+                aggregateFields.add(aggField);
+            }
+        }
+
+        // sort options
+        if (m.groupCount() < 5 || m.group(5) == null) { // no sort options
+            return;
+        }
+        String sortOptions = m.group(5);
+        if (sortOptions != null) {
+            LOG.info("SortOptions: " + sortOptions);
+            List<String> sortOptionList = Arrays.asList(sortOptions.split(","));
+            List<String> rawSortFields = new ArrayList<String>();
+            this.sortOptions = SortOptionsParser.parse(groupbyFieldList, functionList, sortOptionList, rawSortFields);
+            this.sortFunctionTypes = new ArrayList<>();
+            this.sortFields = new ArrayList<>();
+            for (String sortField : rawSortFields) {
+                AggregateFunctionTypeMatcher matcher = AggregateFunctionType.matchAll(sortField);
+                if (matcher.find()) {
+                    sortFunctionTypes.add(matcher.type());
+                    sortFields.add(deleteAtSign(matcher.field().trim()));
+                }
+            }
+        }
+    }
+
+    /**
+     * 1. syntax level - use antlr to pass the queries
+     * 2. semantics level - can't distinguish tag or qualifier
+     * @param qy
+     * @return
+     */
+    private Filter compileQy(String qy) throws EagleQueryParseException {
+        try {
+            EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+            if (qy == null || qy.isEmpty()) {
+                if (ed.getPartitions() == null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.warn("Query string is empty, full table scan query: " + qy);
+                    }
+                    // For hbase 0.98+, empty FilterList() will filter all rows, so we need return null instead
                     return null;
-                               } else {
-                                       final String errMsg = "Entity " + ed.getEntityClass().getSimpleName() + " defined partition, "
-                                                       + "but query doesn't provide partition condition! Query: " + qy; 
-                                       LOG.error(errMsg);
-                                       throw new IllegalArgumentException(errMsg);
-                               }
-                       }
-                       EagleQueryParser parser = new EagleQueryParser(qy);
-                       _queryExpression = parser.parse();
-                       
-                       //TODO: build customize filter for EXP{<Expression>}
-                       HBaseFilterBuilder builder = new HBaseFilterBuilder(ed, _queryExpression, _filterIfMissing);
-                       FilterList flist = builder.buildFilters();
-                       _partitionValues = builder.getPartitionValues();
-                       _filterFields = builder.getFilterFields();
-                       return flist;
-               } catch (InstantiationException e) {
-                       final String errMsg = "Got an InstantiationException: " + e.getMessage();
-                       throw new EagleQueryParseException(_serviceName + " entity does not exist! " + errMsg);
-               } catch (IllegalAccessException e) {
-                       final String errMsg = "Got an IllegalAccessException: " + e.getMessage();
-                       throw new EagleQueryParseException(_serviceName + " entity does not exist! " + errMsg);
-               }
-       }
-       
-       public String serviceName(){
-               return _serviceName;
-       }
-       
-       public List<String> outputFields(){
-               return _outputFields;
-       }
-
-       public Filter filter(){
-               return _filter;
-       }
-       
-       public List<String> groupbyFields(){
-               return _groupbyFields;
-       }
-       
-       public List<AggregateFunctionType> aggregateFunctionTypes(){
-               return _aggregateFunctionTypes;
-       }
-       
-       public List<String> aggregateFields(){
-               return _aggregateFields;
-       }
-       
-       public List<SortOption> sortOptions(){
-               return _sortOptions;
-       }
-
-       public List<AggregateFunctionType> sortFunctions() {
-               return _sortFunctionTypes;
-       }
-       
-       public List<String> sortFields() {
-               return _sortFields;
-       }
-
-       /**
-        * Output all fields (i.e. has * in out fields)
-        *
-        * @return
-        */
-       public boolean isOutputAll(){ return _outputAll;}
-       public Map<String,String> getOutputAlias(){
-               return _outputAlias;
-       }
+                } else {
+                    final String errMsg = "Entity " + ed.getEntityClass().getSimpleName() + " defined partition, "
+                        + "but query doesn't provide partition condition! Query: " + qy;
+                    LOG.error(errMsg);
+                    throw new IllegalArgumentException(errMsg);
+                }
+            }
+            EagleQueryParser parser = new EagleQueryParser(qy);
+            queryExpression = parser.parse();
+
+            //TODO: build customize filter for EXP{<Expression>}
+            HBaseFilterBuilder builder = new HBaseFilterBuilder(ed, queryExpression, filterIfMissing);
+            FilterList flist = builder.buildFilters();
+            partitionValues = builder.getPartitionValues();
+            filterFields = builder.getFilterFields();
+            return flist;
+        } catch (InstantiationException e) {
+            final String errMsg = "Got an InstantiationException: " + e.getMessage();
+            throw new EagleQueryParseException(serviceName + " entity does not exist! " + errMsg);
+        } catch (IllegalAccessException e) {
+            final String errMsg = "Got an IllegalAccessException: " + e.getMessage();
+            throw new EagleQueryParseException(serviceName + " entity does not exist! " + errMsg);
+        }
+    }
+
+    public String serviceName() {
+        return serviceName;
+    }
+
+    public List<String> outputFields() {
+        return outputFields;
+    }
+
+    public Filter filter() {
+        return filter;
+    }
+
+    public List<String> groupbyFields() {
+        return groupbyFields;
+    }
+
+    public List<AggregateFunctionType> aggregateFunctionTypes() {
+        return aggregateFunctionTypes;
+    }
+
+    public List<String> aggregateFields() {
+        return aggregateFields;
+    }
+
+    public List<SortOption> sortOptions() {
+        return sortOptions;
+    }
+
+    public List<AggregateFunctionType> sortFunctions() {
+        return sortFunctionTypes;
+    }
+
+    public List<String> sortFields() {
+        return sortFields;
+    }
+
+    /**
+     * Output all fields (i.e. has * in out fields)
+     *
+     * @return
+     */
+    public boolean isOutputAll() {
+        return outputAll;
+    }
+
+    public Map<String,String> getOutputAlias() {
+        return outputAlias;
+    }
 }
index 231cc99..7205568 100644 (file)
@@ -17,8 +17,8 @@
 package org.apache.eagle.query;
 
 /**
- * @since 3/25/15
+ * @since 3/25/15.
  */
 public class QueryConstants {
-    public final static String CHARSET ="UTF-8";
+    public static final String CHARSET = "UTF-8";
 }
index 8081c88..ed7e8af 100644 (file)
@@ -24,46 +24,55 @@ import java.util.TreeMap;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 public class AggregateAPIEntity {
-       private long numDirectDescendants;
-       private long numTotalDescendants;
-       private String key;
-       private SortedMap<String, AggregateAPIEntity> entityList = new TreeMap<String, AggregateAPIEntity>();
-       private List<AggregateAPIEntity> sortedList = new ArrayList<AggregateAPIEntity>();
+    private long numDirectDescendants;
+    private long numTotalDescendants;
+    private String key;
+    private SortedMap<String, AggregateAPIEntity> entityList = new TreeMap<String, AggregateAPIEntity>();
+    private List<AggregateAPIEntity> sortedList = new ArrayList<AggregateAPIEntity>();
 
-       public String getKey() {
-               return key;
-       }
-       public void setKey(String key) {
-               this.key = key;
-       }
-       @JsonProperty("sL")
-       public List<AggregateAPIEntity> getSortedList() {
-               return sortedList;
-       }
-       public void setSortedList(List<AggregateAPIEntity> sortedList) {
-               this.sortedList = sortedList;
-       }
-       @JsonProperty("eL")
-       public SortedMap<String, AggregateAPIEntity> getEntityList() {
-               return entityList;
-       }
-       public void setEntityList(SortedMap<String, AggregateAPIEntity> entityList) {
-               this.entityList = entityList;
-       }
-       @JsonProperty("nDD")
-       public long getNumDirectDescendants() {
-               return numDirectDescendants;
-       }
-       public void setNumDirectDescendants(long numDirectDescendants) {
-               this.numDirectDescendants = numDirectDescendants;
-       }
-       @JsonProperty("nTD")
-       public long getNumTotalDescendants() {
-               return numTotalDescendants;
-       }
-       public void setNumTotalDescendants(long numTotalDescendants) {
-               this.numTotalDescendants = numTotalDescendants;
-       }
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    @JsonProperty("sL")
+    public List<AggregateAPIEntity> getSortedList() {
+        return sortedList;
+    }
+
+    public void setSortedList(List<AggregateAPIEntity> sortedList) {
+        this.sortedList = sortedList;
+    }
+
+    @JsonProperty("eL")
+    public SortedMap<String, AggregateAPIEntity> getEntityList() {
+        return entityList;
+    }
+
+    public void setEntityList(SortedMap<String, AggregateAPIEntity> entityList) {
+        this.entityList = entityList;
+    }
+
+    @JsonProperty("nDD")
+    public long getNumDirectDescendants() {
+        return numDirectDescendants;
+    }
+
+    public void setNumDirectDescendants(long numDirectDescendants) {
+        this.numDirectDescendants = numDirectDescendants;
+    }
+
+    @JsonProperty("nTD")
+    public long getNumTotalDescendants() {
+        return numTotalDescendants;
+    }
+
+    public void setNumTotalDescendants(long numTotalDescendants) {
+        this.numTotalDescendants = numTotalDescendants;
+    }
 }
\ No newline at end of file
index 5555cfd..c62a05a 100755 (executable)
@@ -23,51 +23,51 @@ import java.util.List;
  *
  * @since : 11/7/14,2014
  */
-public class AggregateCondition implements Serializable{
-       private static final long serialVersionUID = 1L;
-       private List<String> groupbyFields;
-       private List<AggregateFunctionType> aggregateFunctionTypes;
-       private List<String> aggregateFields;
-       private boolean timeSeries;
-       private long intervalMS;
+public class AggregateCondition implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private List<String> groupbyFields;
+    private List<AggregateFunctionType> aggregateFunctionTypes;
+    private List<String> aggregateFields;
+    private boolean timeSeries;
+    private long intervalMS;
 
-       public List<String> getGroupbyFields() {
-               return groupbyFields;
-       }
+    public List<String> getGroupbyFields() {
+        return groupbyFields;
+    }
 
-       public void setGroupbyFields(List<String> groupbyFields) {
-               this.groupbyFields = groupbyFields;
-       }
+    public void setGroupbyFields(List<String> groupbyFields) {
+        this.groupbyFields = groupbyFields;
+    }
 
-       public List<AggregateFunctionType> getAggregateFunctionTypes() {
-               return aggregateFunctionTypes;
-       }
+    public List<AggregateFunctionType> getAggregateFunctionTypes() {
+        return aggregateFunctionTypes;
+    }
 
-       public void setAggregateFunctionTypes(List<AggregateFunctionType> aggregateFunctionTypes) {
-               this.aggregateFunctionTypes = aggregateFunctionTypes;
-       }
+    public void setAggregateFunctionTypes(List<AggregateFunctionType> aggregateFunctionTypes) {
+        this.aggregateFunctionTypes = aggregateFunctionTypes;
+    }
 
-       public List<String> getAggregateFields() {
-               return aggregateFields;
-       }
+    public List<String> getAggregateFields() {
+        return aggregateFields;
+    }
 
-       public void setAggregateFields(List<String> aggregateFields) {
-               this.aggregateFields = aggregateFields;
-       }
+    public void setAggregateFields(List<String> aggregateFields) {
+        this.aggregateFields = aggregateFields;
+    }
 
-       public boolean isTimeSeries() {
-               return timeSeries;
-       }
+    public boolean isTimeSeries() {
+        return timeSeries;
+    }
 
-       public void setTimeSeries(boolean timeSeries) {
-               this.timeSeries = timeSeries;
-       }
+    public void setTimeSeries(boolean timeSeries) {
+        this.timeSeries = timeSeries;
+    }
 
-       public long getIntervalMS() {
-               return intervalMS;
-       }
+    public long getIntervalMS() {
+        return intervalMS;
+    }
 
-       public void setIntervalMS(long intervalMS) {
-               this.intervalMS = intervalMS;
-       }
+    public void setIntervalMS(long intervalMS) {
+        this.intervalMS = intervalMS;
+    }
 }
index df35c8b..09ce4b7 100644 (file)
  */
 package org.apache.eagle.query.aggregate;
 
-public class AggregateFunctionNotSupportedException extends RuntimeException{
-       static final long serialVersionUID = -4548788354899625887L;
-       public AggregateFunctionNotSupportedException(){
-               super();
-       }
-       
-       public AggregateFunctionNotSupportedException(String message){
-               super(message);
-       }
+public class AggregateFunctionNotSupportedException extends RuntimeException {
+    static final long serialVersionUID = -4548788354899625887L;
+
+    public AggregateFunctionNotSupportedException() {
+        super();
+    }
+
+    public AggregateFunctionNotSupportedException(String message) {
+        super(message);
+    }
 }
index 8ac3b8c..73d4dc0 100755 (executable)
@@ -21,65 +21,65 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public enum AggregateFunctionType{
-       count("^(count)$"),
-       sum("^sum\\((.*)\\)$"),
-       avg("^avg\\((.*)\\)$"),
-       max("^max\\((.*)\\)$"),
-       min("^min\\((.*)\\)$");
-       
-       private Pattern pattern;
-       private AggregateFunctionType(String patternString){
-               this.pattern = Pattern.compile(patternString);
-       }
+public enum AggregateFunctionType {
+    count("^(count)$"),
+    sum("^sum\\((.*)\\)$"),
+    avg("^avg\\((.*)\\)$"),
+    max("^max\\((.*)\\)$"),
+    min("^min\\((.*)\\)$");
 
-       /**
-        * This method is thread safe
-        * match and retrieve back the aggregated fields, for count, aggregateFields can be null
-        * @param function
-        * @return
-        */
-       public AggregateFunctionTypeMatcher matcher(String function){
-               Matcher m = pattern.matcher(function);
+    private Pattern pattern;
+    private AggregateFunctionType(String patternString) {
+        this.pattern = Pattern.compile(patternString);
+    }
 
-               if(m.find()){
-                       return new AggregateFunctionTypeMatcher(this, true, m.group(1));
-               }else{
-                       return new AggregateFunctionTypeMatcher(this, false, null);
-               }
-       }
+    /**
+     * This method is thread safe
+     * match and retrieve back the aggregated fields, for count, aggregateFields can be null
+     * @param function
+     * @return
+     */
+    public AggregateFunctionTypeMatcher matcher(String function) {
+        Matcher m = pattern.matcher(function);
 
-       public static AggregateFunctionTypeMatcher matchAll(String function){
-               for(AggregateFunctionType type : values()){
-                       Matcher m = type.pattern.matcher(function);
-                       if(m.find()){
-                               return new AggregateFunctionTypeMatcher(type, true, m.group(1));
-                       }
-               }
-               return new AggregateFunctionTypeMatcher(null, false, null);
-       }
+        if (m.find()) {
+            return new AggregateFunctionTypeMatcher(this, true, m.group(1));
+        } else {
+            return new AggregateFunctionTypeMatcher(this, false, null);
+        }
+    }
 
-       public static byte[] serialize(AggregateFunctionType type){
-               return type.name().getBytes();
-       }
+    public static AggregateFunctionTypeMatcher matchAll(String function) {
+        for (AggregateFunctionType type : values()) {
+            Matcher m = type.pattern.matcher(function);
+            if (m.find()) {
+                return new AggregateFunctionTypeMatcher(type, true, m.group(1));
+            }
+        }
+        return new AggregateFunctionTypeMatcher(null, false, null);
+    }
 
-       public static AggregateFunctionType deserialize(byte[] type){
-               return valueOf(new String(type));
-       }
+    public static byte[] serialize(AggregateFunctionType type) {
+        return type.name().getBytes();
+    }
 
-       public static List<byte[]> toBytesList(List<AggregateFunctionType> types){
-               List<byte[]> result = new ArrayList<byte[]>();
-               for(AggregateFunctionType type:types){
-                       result.add(serialize(type));
-               }
-               return result;
-       }
+    public static AggregateFunctionType deserialize(byte[] type) {
+        return valueOf(new String(type));
+    }
 
-       public static List<AggregateFunctionType> fromBytesList(List<byte[]> types){
-               List<AggregateFunctionType> result = new ArrayList<AggregateFunctionType>();
-               for(byte[] bs:types){
-                       result.add(deserialize(bs));
-               }
-               return result;
-       }
+    public static List<byte[]> toBytesList(List<AggregateFunctionType> types) {
+        List<byte[]> result = new ArrayList<byte[]>();
+        for (AggregateFunctionType type:types) {
+            result.add(serialize(type));
+        }
+        return result;
+    }
+
+    public static List<AggregateFunctionType> fromBytesList(List<byte[]> types) {
+        List<AggregateFunctionType> result = new ArrayList<AggregateFunctionType>();
+        for (byte[] bs:types) {
+            result.add(deserialize(bs));
+        }
+        return result;
+    }
 }
\ No newline at end of file
index 6b2bc13..c829227 100644 (file)
 package org.apache.eagle.query.aggregate;
 
 public class AggregateFunctionTypeMatcher {
-       private final AggregateFunctionType type;
-       private final boolean matched;
-       private final String field;
+    private final AggregateFunctionType type;
+    private final boolean matched;
+    private final String field;
 
-       public AggregateFunctionTypeMatcher(AggregateFunctionType type, boolean matched, String field){
-               this.type = type;
-               this.matched = matched;
-               this.field = field;
-       }
-       
-       public boolean find(){
-               return this.matched;
-       }
-       
-       public String field(){
-               return this.field;
-       }
-       
-       public AggregateFunctionType type(){
-               return this.type;
-       }
+    public AggregateFunctionTypeMatcher(AggregateFunctionType type, boolean matched, String field) {
+        this.type = type;
+        this.matched = matched;
+        this.field = field;
+    }
+
+    public boolean find() {
+        return this.matched;
+    }
+
+    public String field() {
+        return this.field;
+    }
+
+    public AggregateFunctionType type() {
+        return this.type;
+    }
 }
index 616184d..7790f1e 100644 (file)
@@ -19,59 +19,70 @@ package org.apache.eagle.query.aggregate;
 import java.util.ArrayList;
 import java.util.List;
 
-public class AggregateParams{
-       List<String> groupbyFields;
-       boolean counting;
-       List<String> sumFunctionFields = new ArrayList<String>();
-       List<SortFieldOrder> sortFieldOrders = new ArrayList<SortFieldOrder>();
-       
-       public List<SortFieldOrder> getSortFieldOrders() {
-               return sortFieldOrders;
-       }
-       public void setSortFieldOrders(List<SortFieldOrder> sortFieldOrders) {
-               this.sortFieldOrders = sortFieldOrders;
-       }
-       public List<String> getGroupbyFields() {
-               return groupbyFields;
-       }
-       public void setGroupbyFields(List<String> groupbyFields) {
-               this.groupbyFields = groupbyFields;
-       }
-       public boolean isCounting() {
-               return counting;
-       }
-       public void setCounting(boolean counting) {
-               this.counting = counting;
-       }
-       public List<String> getSumFunctionFields() {
-               return sumFunctionFields;
-       }
-       public void setSumFunctionFields(List<String> sumFunctionFields) {
-               this.sumFunctionFields = sumFunctionFields;
-       }
-
-       public static class SortFieldOrder{
-               public static final String SORT_BY_AGGREGATE_KEY = "key";
-               public static final String SORT_BY_COUNT = "count";
-               private String field;
-               private boolean ascendant;
-               
-               public SortFieldOrder(String field, boolean ascendant) {
-                       super();
-                       this.field = field;
-                       this.ascendant = ascendant;
-               }
-               public String getField() {
-                       return field;
-               }
-               public void setField(String field) {
-                       this.field = field;
-               }
-               public boolean isAscendant() {
-                       return ascendant;
-               }
-               public void setAscendant(boolean ascendant) {
-                       this.ascendant = ascendant;
-               } 
-       }
+public class AggregateParams {
+    List<String> groupbyFields;
+    boolean counting;
+    List<String> sumFunctionFields = new ArrayList<String>();
+    List<SortFieldOrder> sortFieldOrders = new ArrayList<SortFieldOrder>();
+
+    public List<SortFieldOrder> getSortFieldOrders() {
+        return sortFieldOrders;
+    }
+
+    public void setSortFieldOrders(List<SortFieldOrder> sortFieldOrders) {
+        this.sortFieldOrders = sortFieldOrders;
+    }
+
+    public List<String> getGroupbyFields() {
+        return groupbyFields;
+    }
+
+    public void setGroupbyFields(List<String> groupbyFields) {
+        this.groupbyFields = groupbyFields;
+    }
+
+    public boolean isCounting() {
+        return counting;
+    }
+
+    public void setCounting(boolean counting) {
+        this.counting = counting;
+    }
+
+    public List<String> getSumFunctionFields() {
+        return sumFunctionFields;
+    }
+
+    public void setSumFunctionFields(List<String> sumFunctionFields) {
+        this.sumFunctionFields = sumFunctionFields;
+    }
+
+    public static class SortFieldOrder {
+        public static final String SORT_BY_AGGREGATE_KEY = "key";
+        public static final String SORT_BY_COUNT = "count";
+        private String field;
+        private boolean ascendant;
+
+        public SortFieldOrder(String field, boolean ascendant) {
+            super();
+            this.field = field;
+            this.ascendant = ascendant;
+        }
+
+        public String getField() {
+            return field;
+        }
+
+        public void setField(String field) {
+            this.field = field;
+        }
+
+        public boolean isAscendant() {
+            return ascendant;
+        }
+
+        public void setAscendant(boolean ascendant) {
+            this.ascendant = ascendant;
+        }
+    }
 }
\ No newline at end of file
index 9500574..ee58197 100644 (file)
@@ -20,75 +20,75 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class AggregateParamsValidator {
-       /**
-        * This method handle the following sytle syntax
-        * sum(numConfiguredMapSlots), count group by cluster, rack 
-        * 1. ensure that all gb fields must occur in outputField or outputTag
-        * 2. ensure that all summarized fields must occur in outputField, 
-        *    for example, for function=sum(numConfiguredMapSlots), numConfiguredMapSlots must occur in outputField  
-        * 3. groupby should be pre-appended with a root groupby field  
-        * @param outputTags
-        * @param outputFields
-        * @param groupbys
-        * @param functions
-        * @throws IllegalArgumentException
-        */
-       public static AggregateParams compileAggregateParams(List<String> outputTags, List<String> outputFields, List<String> groupbys, List<String> functions, List<String> sortFieldOrders)
-                       throws IllegalArgumentException, AggregateFunctionNotSupportedException{
-               AggregateParams aggParams = new AggregateParams();
-               // ensure that all gb fields must occur in outputField or outputTag
-               for(String groupby : groupbys){
-                       if(!outputTags.contains(groupby) && !outputFields.contains(groupby)){
-                               throw new IllegalArgumentException(groupby + ", All gb fields should appear in outputField list or outputTag list");
-                       }
-               }
-               
-               // parse functions and ensure that all summarized fields must occur in outputField
-               for(String function : functions){
-                       AggregateFunctionTypeMatcher m = AggregateFunctionType.count.matcher(function);
-                       if(m.find()){
-                               aggParams.setCounting(true);
-                               continue;
-                       }
+    /**
+     * This method handle the following sytle syntax
+     * sum(numConfiguredMapSlots), count group by cluster, rack
+     * 1. ensure that all gb fields must occur in outputField or outputTag
+     * 2. ensure that all summarized fields must occur in outputField,
+     *    for example, for function = sum(numConfiguredMapSlots), numConfiguredMapSlots must occur in outputField
+     * 3. groupby should be pre-appended with a root groupby field
+     * @param outputTags
+     * @param outputFields
+     * @param groupbys
+     * @param functions
+     * @throws IllegalArgumentException
+     */
+    public static AggregateParams compileAggregateParams(List<String> outputTags, List<String> outputFields, List<String> groupbys, List<String> functions, List<String> sortFieldOrders)
+        throws IllegalArgumentException, AggregateFunctionNotSupportedException {
+        AggregateParams aggParams = new AggregateParams();
+        // ensure that all gb fields must occur in outputField or outputTag
+        for (String groupby : groupbys) {
+            if (!outputTags.contains(groupby) && !outputFields.contains(groupby)) {
+                throw new IllegalArgumentException(groupby + ", All gb fields should appear in outputField list or outputTag list");
+            }
+        }
 
-                       m = AggregateFunctionType.sum.matcher(function);
-                       if(m.find()){
-                               if(!outputFields.contains(m.field())){
-                                       throw new IllegalArgumentException(m.field() + ", All summary function fields should appear in outputField list");
-                               }
-                               aggParams.getSumFunctionFields().add(m.field());
-                               continue;
-                       }
-                       
-                       throw new AggregateFunctionNotSupportedException("function " + function + " is not supported, only count, sum aggregate functions are now supported");
-               }
-               
-               //  groupby should be pre-appended with a root groupby field
-               List<String> groupbyFields = new ArrayList<String>();
-               groupbyFields.add(Aggregator.GROUPBY_ROOT_FIELD_NAME);
-               groupbyFields.addAll(groupbys);
-               aggParams.setGroupbyFields(groupbyFields);
+        // parse functions and ensure that all summarized fields must occur in outputField
+        for (String function : functions) {
+            AggregateFunctionTypeMatcher m = AggregateFunctionType.count.matcher(function);
+            if (m.find()) {
+                aggParams.setCounting(true);
+                continue;
+            }
 
-               // check sort field orders
-               boolean byKeySorting = false;
-               for(String sortFieldOrder : sortFieldOrders){
-                       AggregateParams.SortFieldOrder sfo = SortFieldOrderType.matchAll(sortFieldOrder);
-                       if(sfo == null){
-                               throw new IllegalArgumentException(sortFieldOrder + ", All sort field order should be <field>=(asc|desc)");
-                       }
-                       if(sfo.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY)){
-                               byKeySorting =  true;
-                       }else if(!sfo.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_COUNT)){
-                               if(!groupbys.contains(sfo.getField()) && !aggParams.getSumFunctionFields().contains(sfo.getField())){
-                                       throw new IllegalArgumentException(sortFieldOrder + ", All sort field order should appear in gb or function fields");
-                               }
-                       }
-                       aggParams.getSortFieldOrders().add(sfo);
-               }
-               // always add key ascendant to the last aggregation key if not specified
-               if(!byKeySorting){
-                       aggParams.getSortFieldOrders().add(new AggregateParams.SortFieldOrder(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY, true));
-               }
-               return aggParams;
-       }
+            m = AggregateFunctionType.sum.matcher(function);
+            if (m.find()) {
+                if (!outputFields.contains(m.field())) {
+                    throw new IllegalArgumentException(m.field() + ", All summary function fields should appear in outputField list");
+                }
+                aggParams.getSumFunctionFields().add(m.field());
+                continue;
+            }
+
+            throw new AggregateFunctionNotSupportedException("function " + function + " is not supported, only count, sum aggregate functions are now supported");
+        }
+
+        //  groupby should be pre-appended with a root groupby field
+        List<String> groupbyFields = new ArrayList<String>();
+        groupbyFields.add(Aggregator.GROUPBY_ROOT_FIELD_NAME);
+        groupbyFields.addAll(groupbys);
+        aggParams.setGroupbyFields(groupbyFields);
+
+        // check sort field orders
+        boolean byKeySorting = false;
+        for (String sortFieldOrder : sortFieldOrders) {
+            AggregateParams.SortFieldOrder sfo = SortFieldOrderType.matchAll(sortFieldOrder);
+            if (sfo == null) {
+                throw new IllegalArgumentException(sortFieldOrder + ", All sort field order should be <field>=(asc|desc)");
+            }
+            if (sfo.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY)) {
+                byKeySorting =  true;
+            } else if (!sfo.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_COUNT)) {
+                if (!groupbys.contains(sfo.getField()) && !aggParams.getSumFunctionFields().contains(sfo.getField())) {
+                    throw new IllegalArgumentException(sortFieldOrder + ", All sort field order should appear in gb or function fields");
+                }
+            }
+            aggParams.getSortFieldOrders().add(sfo);
+        }
+        // always add key ascendant to the last aggregation key if not specified
+        if (!byKeySorting) {
+            aggParams.getSortFieldOrders().add(new AggregateParams.SortFieldOrder(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY, true));
+        }
+        return aggParams;
+    }
 }
index 1015c2a..23ebf4f 100644 (file)
@@ -18,35 +18,42 @@ package org.apache.eagle.query.aggregate;
 
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 public class AggregateResultAPIEntity {
-       private boolean success;
-       private String exception;
-       private long elapsedms;
-       private AggregateAPIEntity entity;
-
-       public long getElapsedms() {
-               return elapsedms;
-       }
-       public void setElapsedms(long elapsedms) {
-               this.elapsedms = elapsedms;
-       }
-       public AggregateAPIEntity getEntity() {
-               return entity;
-       }
-       public void setEntity(AggregateAPIEntity entity) {
-               this.entity = entity;
-       }
-       public boolean isSuccess() {
-               return success;
-       }
-       public void setSuccess(boolean success) {
-               this.success = success;
-       }
-       public String getException() {
-               return exception;
-       }
-       public void setException(String exception) {
-               this.exception = exception;
-       }
+    private boolean success;
+    private String exception;
+    private long elapsedms;
+    private AggregateAPIEntity entity;
+
+    public long getElapsedms() {
+        return elapsedms;
+    }
+
+    public void setElapsedms(long elapsedms) {
+        this.elapsedms = elapsedms;
+    }
+
+    public AggregateAPIEntity getEntity() {
+        return entity;
+    }
+
+    public void setEntity(AggregateAPIEntity entity) {
+        this.entity = entity;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
 }
index de911e5..890b7ea 100644 (file)
@@ -26,143 +26,145 @@ import org.slf4j.LoggerFactory;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 
 public class Aggregator {
-       private static final Logger LOG = LoggerFactory.getLogger(Aggregator.class);
-       public static final String GROUPBY_ROOT_FIELD_NAME = "site";
-       public static final String GROUPBY_ROOT_FIELD_VALUE = "xyz";
-       public static final String UNASSIGNED_GROUPBY_ROOT_FIELD_NAME = "unassigned";
-       
-       private final AggregateAPIEntityFactory factory;
-       private final AggregateAPIEntity root;
-       private final List<String> groupbys;
-       private final List<String> sumFunctionFields;
-       private final boolean counting;
-       
-       public Aggregator(AggregateAPIEntityFactory factory, AggregateAPIEntity root, List<String> groupbys, boolean counting, List<String> sumFunctionFields){
-               this.factory = factory;
-               this.root = root;
-               this.groupbys = groupbys;
-               this.sumFunctionFields = sumFunctionFields;
-               this.counting = counting;
-       }
+    private static final Logger LOG = LoggerFactory.getLogger(Aggregator.class);
+    public static final String GROUPBY_ROOT_FIELD_NAME = "site";
+    public static final String GROUPBY_ROOT_FIELD_VALUE = "xyz";
+    public static final String UNASSIGNED_GROUPBY_ROOT_FIELD_NAME = "unassigned";
 
-       /**
-        * this locate result can be cached? we don't need check if it's TaggedLogAPIEntity each time when iterating entities
-        * @param groupby
-        * @param obj
-        * @return
-        * @throws Exception
-        */
-       private String locateGroupbyField(String groupby, TaggedLogAPIEntity obj){
-               if(groupby.equals(GROUPBY_ROOT_FIELD_NAME)){
-                       return GROUPBY_ROOT_FIELD_VALUE;
-               }
-               // check tag first
-               String tagv = obj.getTags().get(groupby);
-               if(tagv != null)
-                       return tagv;
-               // check against pojo, or qualifierValues
-               String fn = groupby.substring(0,1).toUpperCase()+groupby.substring(1, groupby.length());
-               try{
-                       Method getM = obj.getClass().getMethod("get"+fn);
-                       Object value = getM.invoke(obj);
-                       return (String)value;
-               }catch(Exception ex){
-                       LOG.warn(groupby + " field is in neither tags nor fields, " + ex.getMessage());
-                       return null;
-               }
-       }
-       
-       /**
-        * accumulate a list of entities
-        * @param entities
-        * @throws Exception
-        */
-       public void accumulateAll(List<TaggedLogAPIEntity> entities) throws Exception{
-               for(TaggedLogAPIEntity entity : entities){
-                       accumulate(entity);
-               }
-       }
-       
-       /**
-        * currently only group by tags
-        * groupbys' first item always is site, which is a reserved field 
-        */
-       public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-               AggregateAPIEntity current = root;
-               for(String groupby : groupbys){
-                       // TODO tagv is empty, so what to do? use a reserved field_name "unassigned" ?
-                       // TODO we should support all Pojo with java bean style object
-                       String tagv = locateGroupbyField(groupby, entity);
-                       if(tagv == null || tagv.isEmpty()){
-                               tagv = UNASSIGNED_GROUPBY_ROOT_FIELD_NAME;
-                       }
-                       Map<String, AggregateAPIEntity> children = current.getEntityList();
-                       if(children.get(tagv) == null){
-                               children.put(tagv, factory.create());
-                               current.setNumDirectDescendants(current.getNumDirectDescendants()+1);
-                       }
-                       AggregateAPIEntity child = children.get(tagv);
-                       // go through all aggregate functions including count, summary etc.                     
-                       if(counting)
-                               count(child);
-                       for(String sumFunctionField : sumFunctionFields){
-                               sum(child, entity, sumFunctionField);
-                       }
-                       
-                       current = child;
-               }
-               
-       }
+    private final AggregateAPIEntityFactory factory;
+    private final AggregateAPIEntity root;
+    private final List<String> groupbys;
+    private final List<String> sumFunctionFields;
+    private final boolean counting;
 
-       
-       /**
-        * use java bean specifications?
-        * reflection is not efficient, let us find out solutions
-        */
-       private void sum(Object targetObj, TaggedLogAPIEntity srcObj, String fieldName) throws Exception{
-               try{
-                       String fn = fieldName.substring(0,1).toUpperCase()+fieldName.substring(1, fieldName.length());
-                       Method srcGetMethod = srcObj.getClass().getMethod("get"+fn);
-                       Object srcValue = srcGetMethod.invoke(srcObj);
-                       if(srcValue == null){
-                               return;  // silently don't count this source object
-                       }
-                       Method targetGetMethod = targetObj.getClass().getMethod("get"+fn);
-                       Object targetValue = targetGetMethod.invoke(targetObj);
-                       if(targetValue instanceof Long){
-                               Method setM = targetObj.getClass().getMethod("set"+fn, long.class);
-                               Long tmp1 = (Long)targetValue;
-                               // TODO, now source object always have type "java.lang.String", later on we should support various type including integer type
-                               Long tmp2 = null;
-                               if(srcValue instanceof String){
-                                       tmp2 = Long.valueOf((String)srcValue);
-                               }else if(srcValue instanceof Long){
-                                       tmp2 = (Long)srcValue;
-                               }else{
-                                       throw new IllegalAggregateFieldTypeException(srcValue.getClass().toString() + " type is not support. The source type must be Long or String");
-                               }
-                               setM.invoke(targetObj, tmp1.longValue()+tmp2.longValue());
-                       }else if(targetValue instanceof Double){
-                               Method setM = targetObj.getClass().getMethod("set"+fn, double.class);
-                               Double tmp1 = (Double)targetValue;
-                               String src = (String) srcValue;
-                               Double tmp2 = Double.valueOf(src);
-                               setM.invoke(targetObj, tmp1.doubleValue()+tmp2.doubleValue());
-                       }else{
-                               throw new IllegalAggregateFieldTypeException(targetValue.getClass().toString() + " type is not support. The target type must be long or double");
-                       }
-               }catch(Exception ex){
-                       LOG.error("Cannot do sum aggregation for field " + fieldName, ex);
-                       throw ex;
-               }
-       }
-       
-       /**
-        * count possible not only count for number of descendants but also count for not-null fields 
-        * @param targetObj
-        * @throws Exception
-        */
-       private void count(AggregateAPIEntity targetObj) throws Exception{
-               targetObj.setNumTotalDescendants(targetObj.getNumTotalDescendants()+1);
-       }
+    public Aggregator(AggregateAPIEntityFactory factory, AggregateAPIEntity root, List<String> groupbys, boolean counting, List<String> sumFunctionFields) {
+        this.factory = factory;
+        this.root = root;
+        this.groupbys = groupbys;
+        this.sumFunctionFields = sumFunctionFields;
+        this.counting = counting;
+    }
+
+    /**
+     * this locate result can be cached? we don't need check if it's TaggedLogAPIEntity each time when iterating entities
+     * @param groupby
+     * @param obj
+     * @return
+     * @throws Exception
+     */
+    private String locateGroupbyField(String groupby, TaggedLogAPIEntity obj) {
+        if (groupby.equals(GROUPBY_ROOT_FIELD_NAME)) {
+            return GROUPBY_ROOT_FIELD_VALUE;
+        }
+        // check tag first
+        String tagv = obj.getTags().get(groupby);
+        if (tagv != null) {
+            return tagv;
+        }
+        // check against pojo, or qualifierValues
+        String fn = groupby.substring(0,1).toUpperCase() + groupby.substring(1, groupby.length());
+        try {
+            Method getM = obj.getClass().getMethod("get" + fn);
+            Object value = getM.invoke(obj);
+            return (String)value;
+        } catch (Exception ex) {
+            LOG.warn(groupby + " field is in neither tags nor fields, " + ex.getMessage());
+            return null;
+        }
+    }
+
+    /**
+     * accumulate a list of entities
+     * @param entities
+     * @throws Exception
+     */
+    public void accumulateAll(List<TaggedLogAPIEntity> entities) throws Exception {
+        for (TaggedLogAPIEntity entity : entities) {
+            accumulate(entity);
+        }
+    }
+
+    /**
+     * currently only group by tags
+     * groupbys' first item always is site, which is a reserved field
+     */
+    public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+        AggregateAPIEntity current = root;
+        for (String groupby : groupbys) {
+            // TODO tagv is empty, so what to do? use a reserved field_name "unassigned" ?
+            // TODO we should support all Pojo with java bean style object
+            String tagv = locateGroupbyField(groupby, entity);
+            if (tagv == null || tagv.isEmpty()) {
+                tagv = UNASSIGNED_GROUPBY_ROOT_FIELD_NAME;
+            }
+            Map<String, AggregateAPIEntity> children = current.getEntityList();
+            if (children.get(tagv) == null) {
+                children.put(tagv, factory.create());
+                current.setNumDirectDescendants(current.getNumDirectDescendants() + 1);
+            }
+            AggregateAPIEntity child = children.get(tagv);
+            // go through all aggregate functions including count, summary etc.
+            if (counting) {
+                count(child);
+            }
+            for (String sumFunctionField : sumFunctionFields) {
+                sum(child, entity, sumFunctionField);
+            }
+
+            current = child;
+        }
+
+    }
+
+
+    /**
+     * use java bean specifications?
+     * reflection is not efficient, let us find out solutions
+     */
+    private void sum(Object targetObj, TaggedLogAPIEntity srcObj, String fieldName) throws Exception {
+        try {
+            String fn = fieldName.substring(0,1).toUpperCase() + fieldName.substring(1, fieldName.length());
+            Method srcGetMethod = srcObj.getClass().getMethod("get" + fn);
+            Object srcValue = srcGetMethod.invoke(srcObj);
+            if (srcValue == null) {
+                return;  // silently don't count this source object
+            }
+            Method targetGetMethod = targetObj.getClass().getMethod("get" + fn);
+            Object targetValue = targetGetMethod.invoke(targetObj);
+            if (targetValue instanceof Long) {
+                Method setM = targetObj.getClass().getMethod("set" + fn, long.class);
+                Long tmp1 = (Long)targetValue;
+                // TODO, now source object always have type "java.lang.String", later on we should support various type including integer type
+                Long tmp2 = null;
+                if (srcValue instanceof String) {
+                    tmp2 = Long.valueOf((String)srcValue);
+                } else if (srcValue instanceof Long) {
+                    tmp2 = (Long)srcValue;
+                } else {
+                    throw new IllegalAggregateFieldTypeException(srcValue.getClass().toString() + " type is not support. The source type must be Long or String");
+                }
+                setM.invoke(targetObj, tmp1.longValue() + tmp2.longValue());
+            } else if (targetValue instanceof Double) {
+                Method setM = targetObj.getClass().getMethod("set" + fn, double.class);
+                Double tmp1 = (Double)targetValue;
+                String src = (String) srcValue;
+                Double tmp2 = Double.valueOf(src);
+                setM.invoke(targetObj, tmp1.doubleValue() + tmp2.doubleValue());
+            } else {
+                throw new IllegalAggregateFieldTypeException(targetValue.getClass().toString() + " type is not support. The target type must be long or double");
+            }
+        } catch (Exception ex) {
+            LOG.error("Cannot do sum aggregation for field " + fieldName, ex);
+            throw ex;
+        }
+    }
+
+    /**
+     * count possible not only count for number of descendants but also count for not-null fields
+     * @param targetObj
+     * @throws Exception
+     */
+    private void count(AggregateAPIEntity targetObj) throws Exception {
+        targetObj.setNumTotalDescendants(targetObj.getNumTotalDescendants() + 1);
+    }
 }
\ No newline at end of file
index a00c5ad..be2c636 100644 (file)
@@ -24,61 +24,62 @@ import java.util.Map;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 
 public class BucketQuery {
-       public final static String UNASSIGNED_BUCKET = "unassigned"; 
-       private List<String> bucketFields;
-       private int limit;
-       private Map<String, Object> root = new HashMap<String, Object>();
-       
-       public BucketQuery(List<String> bucketFields, int limit){
-               this.bucketFields = bucketFields;
-               this.limit = limit;
-       }
-       
-       @SuppressWarnings("unchecked")
-       public void put(TaggedLogAPIEntity entity){
-               Map<String, Object> current = root;
-               int bucketCount = bucketFields.size();
-               if(bucketCount <= 0)
-                       return; // silently return
-               int i = 0;
-               String bucketFieldValue = null;
-               for(; i<bucketCount; i++){
-                       String bucketField = bucketFields.get(i);
-                       bucketFieldValue = entity.getTags().get(bucketField);
-                       if(bucketFieldValue == null || bucketFieldValue.isEmpty()){
-                               bucketFieldValue = UNASSIGNED_BUCKET;
-                       }
-                       // for last bucket, bypass the following logic
-                       if(i == bucketCount-1){
-                               break;
-                       }
-                               
-                       if(current.get(bucketFieldValue) == null){
-                               current.put(bucketFieldValue, new HashMap<String, Object>());
-                       }
-                       // for the last level of bucket, it is not Map, instead it is List<TaggedLogAPIEntity> 
-                       current = (Map<String, Object>)current.get(bucketFieldValue);
-               }
-               List<TaggedLogAPIEntity> bucketContent = (List<TaggedLogAPIEntity>)current.get(bucketFieldValue);
-               if(bucketContent == null){
-                       bucketContent = new ArrayList<TaggedLogAPIEntity>();
-                       current.put(bucketFieldValue, bucketContent);
-               }
-               
-               if(bucketContent.size() >= limit){
-                       return;
-               }else{
-                       bucketContent.add(entity);
-               }
-       }
-       
-       public void batchPut(List<TaggedLogAPIEntity> entities){
-               for(TaggedLogAPIEntity entity : entities){
-                       put(entity);
-               }
-       }
-       
-       public Map<String, Object> get(){
-               return root;
-       }
+    public static final String UNASSIGNED_BUCKET = "unassigned";
+    private List<String> bucketFields;
+    private int limit;
+    private Map<String, Object> root = new HashMap<String, Object>();
+
+    public BucketQuery(List<String> bucketFields, int limit) {
+        this.bucketFields = bucketFields;
+        this.limit = limit;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void put(TaggedLogAPIEntity entity) {
+        Map<String, Object> current = root;
+        int bucketCount = bucketFields.size();
+        if (bucketCount <= 0) {
+            return; // silently return
+        }
+        int i = 0;
+        String bucketFieldValue = null;
+        for (; i < bucketCount; i++) {
+            String bucketField = bucketFields.get(i);
+            bucketFieldValue = entity.getTags().get(bucketField);
+            if (bucketFieldValue == null || bucketFieldValue.isEmpty()) {
+                bucketFieldValue = UNASSIGNED_BUCKET;
+            }
+            // for last bucket, bypass the following logic
+            if (i == bucketCount - 1) {
+                break;
+            }
+
+            if (current.get(bucketFieldValue) == null) {
+                current.put(bucketFieldValue, new HashMap<String, Object>());
+            }
+            // for the last level of bucket, it is not Map, instead it is List<TaggedLogAPIEntity>
+            current = (Map<String, Object>)current.get(bucketFieldValue);
+        }
+        List<TaggedLogAPIEntity> bucketContent = (List<TaggedLogAPIEntity>)current.get(bucketFieldValue);
+        if (bucketContent == null) {
+            bucketContent = new ArrayList<TaggedLogAPIEntity>();
+            current.put(bucketFieldValue, bucketContent);
+        }
+
+        if (bucketContent.size() >= limit) {
+            return;
+        } else {
+            bucketContent.add(entity);
+        }
+    }
+
+    public void batchPut(List<TaggedLogAPIEntity> entities) {
+        for (TaggedLogAPIEntity entity : entities) {
+            put(entity);
+        }
+    }
+
+    public Map<String, Object> get() {
+        return root;
+    }
 }
index 3e3e739..05f7fb8 100644 (file)
  */
 package org.apache.eagle.query.aggregate;
 
-public class IllegalAggregateFieldTypeException extends RuntimeException{
-       static final long serialVersionUID = -4548788354899625887L;
-       public IllegalAggregateFieldTypeException(){
-               super();
-       }
-       
-       public IllegalAggregateFieldTypeException(String message){
-               super(message + ", only count and sum are support");
-       }
+public class IllegalAggregateFieldTypeException extends RuntimeException {
+    static final long serialVersionUID = -4548788354899625887L;
+
+    public IllegalAggregateFieldTypeException() {
+        super();
+    }
+
+    public IllegalAggregateFieldTypeException(String message) {
+        super(message + ", only count and sum are support");
+    }
 }
index b801255..d27e10e 100644 (file)
@@ -27,75 +27,79 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PostAggregateSorting {
-       private static final Logger LOG = LoggerFactory.getLogger(PostAggregateSorting.class);
-       
-       private static SortedSet<Map.Entry<String, AggregateAPIEntity>> sortByValue(Map<String, AggregateAPIEntity> map, List<AggregateParams.SortFieldOrder> sortedFields) {
-           SortedSet<Map.Entry<String, AggregateAPIEntity>> sortedEntries = new TreeSet<Map.Entry<String, AggregateAPIEntity>>(new MapKeyValueComparator(sortedFields));
-           sortedEntries.addAll(map.entrySet());
-           return sortedEntries;
-       }
+    private static final Logger LOG = LoggerFactory.getLogger(PostAggregateSorting.class);
 
-       /**
-        * recursively populate sorted list from entity list
-        * @param entity
-        */
-       public static void sort(AggregateAPIEntity entity, List<AggregateParams.SortFieldOrder> sortFieldOrders){
-               // sort should internally add key field to AggregateAPIEntity before the sorting starts as "key" could be sorted against
-               Map<String, AggregateAPIEntity> children = entity.getEntityList();
-               for(Map.Entry<String, AggregateAPIEntity> e : children.entrySet()){
-                       e.getValue().setKey(e.getKey());
-               }
-               SortedSet<Map.Entry<String, AggregateAPIEntity>> set = sortByValue(children, sortFieldOrders);
-               for(Map.Entry<String, AggregateAPIEntity> entry : set){
-                       entity.getSortedList().add(entry.getValue());
-               }
-               for(Map.Entry<String, AggregateAPIEntity> entry : entity.getEntityList().entrySet()){
-                       sort(entry.getValue(), sortFieldOrders);
-               }
-               entity.setEntityList(null);
-       }
+    private static SortedSet<Map.Entry<String, AggregateAPIEntity>> sortByValue(Map<String, AggregateAPIEntity> map, List<AggregateParams.SortFieldOrder> sortedFields) {
+        SortedSet<Map.Entry<String, AggregateAPIEntity>> sortedEntries = new TreeSet<Map.Entry<String, AggregateAPIEntity>>(new MapKeyValueComparator(sortedFields));
+        sortedEntries.addAll(map.entrySet());
+        return sortedEntries;
+    }
 
-       private static class MapKeyValueComparator implements Comparator<Map.Entry<String, AggregateAPIEntity>>{
-               private List<AggregateParams.SortFieldOrder> sortedFieldOrders;
-               public MapKeyValueComparator(List<AggregateParams.SortFieldOrder> sortedFields){
-                       this.sortedFieldOrders = sortedFields;
-               }
-               @Override
-        public int compare(Map.Entry<String, AggregateAPIEntity> e1, Map.Entry<String, AggregateAPIEntity> e2){
-                       int r = 0;
-                       AggregateAPIEntity entity1 = e1.getValue();
-                       AggregateAPIEntity entity2 = e2.getValue();
-            for(AggregateParams.SortFieldOrder sortFieldOrder : sortedFieldOrders){
-               // TODO count should not be literal, compare numTotalDescendants
-               if(sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_COUNT)){
-                       long tmp = entity1.getNumTotalDescendants() - entity2.getNumTotalDescendants();
-                       r = (tmp == 0) ? 0 : ((tmp > 0) ? 1 : -1);
-               }else if(sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY)){
-                       r = entity1.getKey().compareTo(entity2.getKey());
-               }else{
-                       try{
-                               String sortedField = sortFieldOrder.getField();
-                               String tmp1 = sortedField.substring(0, 1).toUpperCase()+sortedField.substring(1);
-                               Method getMethod1 = entity1.getClass().getMethod("get"+tmp1);
-                               Object r1 = getMethod1.invoke(entity1);
-                               Long comp1 = (Long)r1;
-                               String tmp2 = sortedField.substring(0, 1).toUpperCase()+sortedField.substring(1);
-                               Method getMethod2 = entity2.getClass().getMethod("get"+tmp2);
-                               Object r2 = getMethod2.invoke(entity2);
-                               Long comp2 = (Long)r2;
-                               r = comp1.compareTo(comp2);
-                       }catch(Exception ex){
-                               LOG.error("Can not get corresponding field for sorting", ex);
-                               r = 0;
-                       }
-               }
-               if(r == 0) continue;
-                       if(!sortFieldOrder.isAscendant()){
-                               r = -r;
-                       }
-                       return r;
-            }  
-                       return r;
+    /**
+     * recursively populate sorted list from entity list
+     * @param entity
+     */
+    public static void sort(AggregateAPIEntity entity, List<AggregateParams.SortFieldOrder> sortFieldOrders) {
+        // sort should internally add key field to AggregateAPIEntity before the sorting starts as "key" could be sorted against
+        Map<String, AggregateAPIEntity> children = entity.getEntityList();
+        for (Map.Entry<String, AggregateAPIEntity> e : children.entrySet()) {
+            e.getValue().setKey(e.getKey());
         }
-       }
+        SortedSet<Map.Entry<String, AggregateAPIEntity>> set = sortByValue(children, sortFieldOrders);
+        for (Map.Entry<String, AggregateAPIEntity> entry : set) {
+            entity.getSortedList().add(entry.getValue());
+        }
+        for (Map.Entry<String, AggregateAPIEntity> entry : entity.getEntityList().entrySet()) {
+            sort(entry.getValue(), sortFieldOrders);
+        }
+        entity.setEntityList(null);
+    }
+
+    private static class MapKeyValueComparator implements Comparator<Map.Entry<String, AggregateAPIEntity>> {
+        private List<AggregateParams.SortFieldOrder> sortedFieldOrders;
+
+        public MapKeyValueComparator(List<AggregateParams.SortFieldOrder> sortedFields) {
+            this.sortedFieldOrders = sortedFields;
+        }
+
+        @Override
+        public int compare(Map.Entry<String, AggregateAPIEntity> e1, Map.Entry<String, AggregateAPIEntity> e2) {
+            int r = 0;
+            AggregateAPIEntity entity1 = e1.getValue();
+            AggregateAPIEntity entity2 = e2.getValue();
+            for (AggregateParams.SortFieldOrder sortFieldOrder : sortedFieldOrders) {
+                // TODO count should not be literal, compare numTotalDescendants
+                if (sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_COUNT)) {
+                    long tmp = entity1.getNumTotalDescendants() - entity2.getNumTotalDescendants();
+                    r = (tmp == 0) ? 0 : ((tmp > 0) ? 1 : -1);
+                } else if (sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY)) {
+                    r = entity1.getKey().compareTo(entity2.getKey());
+                } else {
+                    try {
+                        String sortedField = sortFieldOrder.getField();
+                        String tmp1 = sortedField.substring(0, 1).toUpperCase() + sortedField.substring(1);
+                        Method getMethod1 = entity1.getClass().getMethod("get" + tmp1);
+                        Object r1 = getMethod1.invoke(entity1);
+                        Long comp1 = (Long)r1;
+                        String tmp2 = sortedField.substring(0, 1).toUpperCase() + sortedField.substring(1);
+                        Method getMethod2 = entity2.getClass().getMethod("get" + tmp2);
+                        Object r2 = getMethod2.invoke(entity2);
+                        Long comp2 = (Long)r2;
+                        r = comp1.compareTo(comp2);
+                    } catch (Exception ex) {
+                        LOG.error("Can not get corresponding field for sorting", ex);
+                        r = 0;
+                    }
+                }
+                if (r == 0) {
+                    continue;
+                }
+                if (!sortFieldOrder.isAscendant()) {
+                    r = -r;
+                }
+                return r;
+            }
+            return r;
+        }
+    }
 }
index 6d47c7f..30a51d6 100644 (file)
@@ -20,40 +20,41 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public enum SortFieldOrderType {
-       key("^(key)=(asc|desc)$"),
-       count("^(count)=(asc|desc)$"),
-       sum("^sum\\((.*)\\)=(asc|desc)$"),
-       avg("^avg\\((.*)\\)(asc|desc)$"),
-       max("^max\\((.*)\\)(asc|desc)$"),
-       min("^min\\((.*)\\)(asc|desc)$");
-       
-       private Pattern pattern;
-       private SortFieldOrderType(String patternString){
-               this.pattern = Pattern.compile(patternString);
-       }
+    key("^(key)=(asc|desc)$"),
+    count("^(count)=(asc|desc)$"),
+    sum("^sum\\((.*)\\)=(asc|desc)$"),
+    avg("^avg\\((.*)\\)(asc|desc)$"),
+    max("^max\\((.*)\\)(asc|desc)$"),
+    min("^min\\((.*)\\)(asc|desc)$");
 
-       /**
-        * This method is thread safe
-        * match and retrieve back the aggregated fields, for count, aggregateFields can be null
-        * @param sortFieldOrder
-        * @return
-        */
-       public SortFieldOrderTypeMatcher matcher(String sortFieldOrder){
-               Matcher m = pattern.matcher(sortFieldOrder);
-               
-               if(m.find()){
-                       return new SortFieldOrderTypeMatcher(true, m.group(1), m.group(2));
-               }else{
-                       return new SortFieldOrderTypeMatcher(false, null, null);
-               }
-       }
-       
-       public static AggregateParams.SortFieldOrder matchAll(String sortFieldOrder){
-               for(SortFieldOrderType type : SortFieldOrderType.values()){
-                       SortFieldOrderTypeMatcher m = type.matcher(sortFieldOrder);
-                       if(m.find())
-                               return m.sortFieldOrder();
-               }
-               return null;
-       }
+    private Pattern pattern;
+    private SortFieldOrderType(String patternString) {
+        this.pattern = Pattern.compile(patternString);
+    }
+
+    /**
+     * This method is thread safe
+     * match and retrieve back the aggregated fields, for count, aggregateFields can be null
+     * @param sortFieldOrder
+     * @return
+     */
+    public SortFieldOrderTypeMatcher matcher(String sortFieldOrder) {
+        Matcher m = pattern.matcher(sortFieldOrder);
+
+        if (m.find()) {
+            return new SortFieldOrderTypeMatcher(true, m.group(1), m.group(2));
+        } else {
+            return new SortFieldOrderTypeMatcher(false, null, null);
+        }
+    }
+
+    public static AggregateParams.SortFieldOrder matchAll(String sortFieldOrder) {
+        for (SortFieldOrderType type : SortFieldOrderType.values()) {
+            SortFieldOrderTypeMatcher m = type.matcher(sortFieldOrder);
+            if (m.find()) {
+                return m.sortFieldOrder();
+            }
+        }
+        return null;
+    }
 }
index 0b4d408..8ef5c28 100644 (file)
@@ -18,21 +18,21 @@ package org.apache.eagle.query.aggregate;
 
 
 public class SortFieldOrderTypeMatcher {
-       private boolean matched;
-       private AggregateParams.SortFieldOrder sortFieldOrder;
+    private boolean matched;
+    private AggregateParams.SortFieldOrder sortFieldOrder;
 
-       public SortFieldOrderTypeMatcher(boolean matched, String field, String order){
-               this.matched = matched;
-               if(matched){
-                       this.sortFieldOrder = new AggregateParams.SortFieldOrder(field, order.equals("asc"));
-               }
-       }
-       
-       public boolean find(){
-               return this.matched;
-       }
-       
-       public AggregateParams.SortFieldOrder sortFieldOrder(){
-               return this.sortFieldOrder;
-       }
+    public SortFieldOrderTypeMatcher(boolean matched, String field, String order) {
+        this.matched = matched;
+        if (matched) {
+            this.sortFieldOrder = new AggregateParams.SortFieldOrder(field, order.equals("asc"));
+        }
+    }
+
+    public boolean find() {
+        return this.matched;
+    }
+
+    public AggregateParams.SortFieldOrder sortFieldOrder() {
+        return this.sortFieldOrder;
+    }
 }
index 83c683c..90abf2c 100755 (executable)
  */
 package org.apache.eagle.query.aggregate.raw;
 
-public abstract class Function{
-       private int count = 0;
-       protected void incrCount(int num){ count += num; }
-       public int count(){ return count; }
-       public abstract void run(double v,int count);
-       public void run(double v){ run(v,1); }
-       public abstract double result();
-
-       public static class Avg extends Function {
-               private double total;
-               public Avg(){
-                       this.total = 0.0;
-               }
-               @Override
-               public void run(double v,int count){
-                       this.incrCount(count);
-                       total += v;
-               }
-               @Override
-               public double result(){
-                       return this.total/this.count();
-               }
-       }
-
-       public static class Max extends Function {
-               private double maximum;
-               public Max(){
-                       // TODO is this a bug, or only positive numeric calculation is supported
-                       this.maximum = 0.0;
-               }
-
-               @Override
-               public void run(double v,int count){
-                       this.incrCount(count);
-                       if(v > maximum){
-                               maximum = v;
-                       }
-               }
-
-               @Override
-               public double result(){
-                       return maximum;
-               }
-       }
-
-       public static class Min extends Function {
-               private double minimum;
-               public Min(){
-                       // TODO is this a bug, or only positive numeric calculation is supported
-                       this.minimum = Double.MAX_VALUE;
-               }
-               @Override
-               public void run(double v,int count){
-                       this.incrCount(count);
-                       if(v < minimum){
-                               minimum = v;
-                       }
-               }
-
-               @Override
-               public double result(){
-                       return minimum;
-               }
-       }
-
-       public static class Sum extends Function {
-               private double summary;
-               public Sum(){
-                       this.summary = 0.0;
-               }
-               @Override
-               public void run(double v,int count){
-                       this.incrCount(count);
-                       this.summary += v;
-               }
-
-               @Override
-               public double result(){
-                       return this.summary;
-               }
-       }
-
-       public static class Count extends Sum{
-               public Count(){
-                       super();
-               }
-       }
+public abstract class Function {
+    private int count = 0;
+
+    protected void incrCount(int num) {
+        count += num;
+    }
+
+    public int count() {
+        return count;
+    }
+
+    public abstract void run(double v,int count);
+
+    public void run(double v) {
+        run(v, 1);
+    }
+
+    public abstract double result();
+
+    public static class Avg extends Function {
+        private double total;
+
+        public Avg() {
+            this.total = 0.0;
+        }
+
+        @Override
+        public void run(double v, int count) {
+            this.incrCount(count);
+            total += v;
+        }
+
+        @Override
+        public double result() {
+            return this.total / this.count();
+        }
+    }
+
+    public static class Max extends Function {
+        private double maximum;
+
+        public Max() {
+            // TODO is this a bug, or only positive numeric calculation is supported
+            this.maximum = 0.0;
+        }
+
+        @Override
+        public void run(double v,int count) {
+            this.incrCount(count);
+            if (v > maximum) {
+                maximum = v;
+            }
+        }
+
+        @Override
+        public double result() {
+            return maximum;
+        }
+    }
+
+    public static class Min extends Function {
+        private double minimum;
+
+        public Min() {
+            // TODO is this a bug, or only positive numeric calculation is supported
+            this.minimum = Double.MAX_VALUE;
+        }
+
+        @Override
+        public void run(double v,int count) {
+            this.incrCount(count);
+            if (v < minimum) {
+                minimum = v;
+            }
+        }
+
+        @Override
+        public double result() {
+            return minimum;
+        }
+    }
+
+    public static class Sum extends Function {
+        private double summary;
+
+        public Sum() {
+            this.summary = 0.0;
+        }
+
+        @Override
+        public void run(double v,int count) {
+            this.incrCount(count);
+            this.summary += v;
+        }
+
+        @Override
+        public double result() {
+            return this.summary;
+        }
+    }
+
+    public static class Count extends Sum {
+
+        public Count() {
+            super();
+        }
+
+    }
 }
\ No newline at end of file
index c6d1861..4f9330d 100755 (executable)
@@ -21,55 +21,55 @@ import org.apache.eagle.query.aggregate.AggregateFunctionType;
 import java.util.HashMap;
 import java.util.Map;
 
-public abstract class FunctionFactory{
-       public abstract Function createFunction();
+public abstract class FunctionFactory {
+    public abstract Function createFunction();
 
-       public static class AvgFactory extends FunctionFactory {
-               @Override
-               public Function createFunction(){
-                       return new Function.Avg();
-               }
-       }
+    public static class AvgFactory extends FunctionFactory {
+        @Override
+        public Function createFunction() {
+            return new Function.Avg();
+        }
+    }
 
-       public static class MaxFactory extends FunctionFactory {
-               @Override
-               public Function createFunction(){
-                       return new Function.Max();
-               }
-       }
+    public static class MaxFactory extends FunctionFactory {
+        @Override
+        public Function createFunction() {
+            return new Function.Max();
+        }
+    }
 
-       public static class MinFactory extends FunctionFactory {
-               @Override
-               public Function createFunction(){
-                       return new Function.Min();
-               }
-       }
+    public static class MinFactory extends FunctionFactory {
+        @Override
+        public Function createFunction() {
+            return new Function.Min();
+        }
+    }
 
-       public static class CountFactory extends FunctionFactory {
-               @Override
-               public Function createFunction(){
-                       return new Function.Count();
-               }
-       }
+    public static class CountFactory extends FunctionFactory {
+        @Override
+        public Function createFunction() {
+            return new Function.Count();
+        }
+    }
 
-       public static class SumFactory extends FunctionFactory {
-               @Override
-               public Function createFunction(){
-                       return new Function.Sum();
-               }
-       }
+    public static class SumFactory extends FunctionFactory {
+        @Override
+        public Function createFunction() {
+            return new Function.Sum();
+        }
+    }
 
-       public static FunctionFactory locateFunctionFactory(AggregateFunctionType funcType){
-               return _functionFactories.get(funcType.name());
-       }
+    public static FunctionFactory locateFunctionFactory(AggregateFunctionType funcType) {
+        return functionFactories.get(funcType.name());
+    }
 
-       private static Map<String, FunctionFactory> _functionFactories = new HashMap<String, FunctionFactory>();
-       static{
-               _functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
-               _functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
-               _functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
-               _functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
-               _functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
-       }
+    private static Map<String, FunctionFactory> functionFactories = new HashMap<String, FunctionFactory>();
+
+    static {
+        functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
+        functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
+        functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
+        functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
+        functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
+    }
 }
-       
\ No newline at end of file
index c8ed260..b6970d0 100755 (executable)
@@ -38,112 +38,115 @@ import java.util.ListIterator;
  * </pre>
  */
 public class GroupbyKey implements Writable {
-       private final WritableList<BytesWritable> value;
+    private final WritableList<BytesWritable> value;
 
-       public void addValue(byte[] value){
-               this.value.add(new BytesWritable(value));
-       }
-       public void addAll(List<BytesWritable> list){
-               this.value.addAll(list);
-       }
+    public void addValue(byte[] value) {
+        this.value.add(new BytesWritable(value));
+    }
 
-       public List<BytesWritable> getValue(){
-               return value;
-       }
+    public void addAll(List<BytesWritable> list) {
+        this.value.addAll(list);
+    }
 
-       /**
-        * empty constructor
-        */
-       public GroupbyKey(){
-               this.value = new WritableList<BytesWritable>(BytesWritable.class);
-       }
+    public List<BytesWritable> getValue() {
+        return value;
+    }
 
-       /**
-        * clear for reuse
-        */
-       public void clear(){
-               value.clear();
-       }
+    /**
+     * empty constructor
+     */
+    public GroupbyKey() {
+        this.value = new WritableList<BytesWritable>(BytesWritable.class);
+    }
 
-       /**
-        * copy constructor
-        * @param key
-        */
-       public GroupbyKey(GroupbyKey key){
-               this();
-               ListIterator<BytesWritable> it = key.value.listIterator();
-//             ListIterator<byte[]> it = key.value.listIterator();
-               while(it.hasNext()){
-                       this.value.add(it.next());
-               }
-       }
+    /**
+     * clear for reuse
+     */
+    public void clear() {
+        value.clear();
+    }
 
-       public GroupbyKey(List<byte[]> bytes){
-               this();
-               for(byte[] bt:bytes){
-                       this.addValue(bt);
-               }
-       }
+    /**
+     * copy constructor
+     * @param key
+     */
+    public GroupbyKey(GroupbyKey key) {
+        this();
+        ListIterator<BytesWritable> it = key.value.listIterator();
+        // ListIterator<byte[]> it = key.value.listIterator();
+        while (it.hasNext()) {
+            this.value.add(it.next());
+        }
+    }
 
-       @Override
-       public boolean equals(Object obj){
-               if(obj == this)
-                       return true;
-               if(!(obj instanceof GroupbyKey)){
-                       return false;
-               }
-               GroupbyKey that = (GroupbyKey)obj;
-               ListIterator<BytesWritable> e1 = this.value.listIterator();
-               ListIterator<BytesWritable> e2 = that.value.listIterator();
-               while(e1.hasNext() && e2.hasNext()){
-                       if(!Arrays.equals(e1.next().getBytes(), e2.next().getBytes()))
-                               return false;
-               }
-               return !(e1.hasNext() || e2.hasNext());
-       }
+    public GroupbyKey(List<byte[]> bytes) {
+        this();
+        for (byte[] bt:bytes) {
+            this.addValue(bt);
+        }
+    }
 
-       @Override
-       public String toString() {
-               List<String> items = new ArrayList<>(this.value.size());
-               ListIterator<BytesWritable> iterator = this.value.listIterator();
-               while(iterator.hasNext()){
-                       items.add(iterator.next().toString());
-               }
-               return String.format("%s(%s)",this.getClass().getSimpleName(),StringUtils.join(items,","));
-       }
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof GroupbyKey)) {
+            return false;
+        }
+        GroupbyKey that = (GroupbyKey)obj;
+        ListIterator<BytesWritable> e1 = this.value.listIterator();
+        ListIterator<BytesWritable> e2 = that.value.listIterator();
+        while (e1.hasNext() && e2.hasNext()) {
+            if (!Arrays.equals(e1.next().getBytes(), e2.next().getBytes())) {
+                return false;
+            }
+        }
+        return !(e1.hasNext() || e2.hasNext());
+    }
 
-       @Override
-       public int hashCode(){
-               ListIterator<BytesWritable> e1 = this.value.listIterator();
-               int hash = 0xFFFFFFFF;
-               while(e1.hasNext()){
-                       hash ^= Arrays.hashCode(e1.next().getBytes());
-               }
-               return hash;
-       }
+    @Override
+    public String toString() {
+        List<String> items = new ArrayList<>(this.value.size());
+        ListIterator<BytesWritable> iterator = this.value.listIterator();
+        while (iterator.hasNext()) {
+            items.add(iterator.next().toString());
+        }
+        return String.format("%s(%s)",this.getClass().getSimpleName(),StringUtils.join(items,","));
+    }
 
-       /**
-        * Serialize the fields of this object to <code>out</code>.
-        *
-        * @param out <code>DataOuput</code> to serialize this object into.
-        * @throws java.io.IOException
-        */
-       @Override
-       public void write(DataOutput out) throws IOException {
-               this.value.write(out);
-       }
+    @Override
+    public int hashCode() {
+        ListIterator<BytesWritable> e1 = this.value.listIterator();
+        int hash = 0xFFFFFFFF;
+        while (e1.hasNext()) {
+            hash ^= Arrays.hashCode(e1.next().getBytes());
+        }
+        return hash;
+    }
 
-       /**
-        * Deserialize the fields of this object from <code>in</code>.
-        * <p/>
-        * <p>For efficiency, implementations should attempt to re-use storage in the
-        * existing object where possible.</p>
-        *
-        * @param in <code>DataInput</code> to deseriablize this object from.
-        * @throws java.io.IOException
-        */
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               this.value.readFields(in);
-       }
+    /**
+     * Serialize the fields of this object to <code>out</code>.
+     *
+     * @param out <code>DataOuput</code> to serialize this object into.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void write(DataOutput out) throws IOException {
+        this.value.write(out);
+    }
+
+    /**
+     * Deserialize the fields of this object from <code>in</code>.
+     * <p/>
+     * <p>For efficiency, implementations should attempt to re-use storage in the
+     * existing object where possible.</p>
+     *
+     * @param in <code>DataInput</code> to deseriablize this object from.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.value.readFields(in);
+    }
 }
index 7e20029..723df2c 100755 (executable)
@@ -19,7 +19,7 @@ package org.apache.eagle.query.aggregate.raw;
 import java.util.List;
 
 /**
- * The generic interface to unify the GroupbyKeyValue-based results of different 
+ * The generic interface to unify the GroupbyKeyValue-based results of different
  * business logic aggregates like RawAggregator or TimeSeriesAggregator
  *
  * @see org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator
@@ -29,11 +29,11 @@ import java.util.List;
  *
  */
 public interface GroupbyKeyAggregatable {
-       /**
-        * @see RawAggregator#getGroupbyKeyValues()
-        * @see org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator#getGroupbyKeyValues()
-        * 
-        * @return
-        */
-       public List<GroupbyKeyValue> getGroupbyKeyValues();
+    /**
+     * @see RawAggregator#getGroupbyKeyValues()
+     * @see org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator#getGroupbyKeyValues()
+     *
+     * @return
+     */
+    List<GroupbyKeyValue> getGroupbyKeyValues();
 }
\ No newline at end of file
index f976c8c..ad1f755 100755 (executable)
@@ -23,21 +23,25 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.ListIterator;
 
-public class GroupbyKeyComparator implements Comparator<GroupbyKey>{
-       @Override 
-    public int compare(GroupbyKey key1, GroupbyKey key2){
-               List<BytesWritable> list1 = key1.getValue();
-               List<BytesWritable> list2 = key2.getValue();
-               
-               if(list1 == null || list2 == null || list1.size() != list2.size())
-                       throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
-               ListIterator<BytesWritable> e1 = list1.listIterator();
-               ListIterator<BytesWritable> e2 = list2.listIterator();
-               while(e1.hasNext() && e2.hasNext()){
-                       int r = Bytes.compareTo(e1.next().copyBytes(), e2.next().copyBytes());
-                       if(r != 0)
-                               return r;
-               }
-               return 0;
-       }
+public class GroupbyKeyComparator implements Comparator<GroupbyKey> {
+
+    @Override
+    public int compare(GroupbyKey key1, GroupbyKey key2) {
+        List<BytesWritable> list1 = key1.getValue();
+        List<BytesWritable> list2 = key2.getValue();
+
+        if (list1 == null || list2 == null || list1.size() != list2.size()) {
+            throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
+        }
+        ListIterator<BytesWritable> e1 = list1.listIterator();
+        ListIterator<BytesWritable> e2 = list2.listIterator();
+        while (e1.hasNext() && e2.hasNext()) {
+            int r = Bytes.compareTo(e1.next().copyBytes(), e2.next().copyBytes());
+            if (r != 0) {
+                return r;
+            }
+        }
+        return 0;
+    }
+
 }
index 2256761..8420b11 100755 (executable)
@@ -36,56 +36,59 @@ import java.io.IOException;
  * @since : 11/4/14,2014
  */
 public class GroupbyKeyValue implements Writable {
-       private GroupbyKey key;
-       private GroupbyValue value;
-       public GroupbyKeyValue(){
-               this.key = new GroupbyKey();
-               this.value = new GroupbyValue();
-       }
-       public GroupbyKeyValue(GroupbyKey key,GroupbyValue value){
-               this.key = key;
-               this.value = value;
-       }
-       public GroupbyKey getKey() {
-               return key;
-       }
+    private GroupbyKey key;
+    private GroupbyValue value;
 
-       public void setKey(GroupbyKey key) {
-               this.key = key;
-       }
+    public GroupbyKeyValue() {
+        this.key = new GroupbyKey();
+        this.value = new GroupbyValue();
+    }
 
-       public GroupbyValue getValue() {
-               return value;
-       }
+    public GroupbyKeyValue(GroupbyKey key,GroupbyValue value) {
+        this.key = key;
+        this.value = value;
+    }
 
-       public void setValue(GroupbyValue value) {
-               this.value = value;
-       }
+    public GroupbyKey getKey() {
+        return key;
+    }
 
-       /**
-        * Serialize the fields of this object to <code>out</code>.
-        *
-        * @param out <code>DataOuput</code> to serialize this object into.
-        * @throws java.io.IOException
-        */
-       @Override
-       public void write(DataOutput out) throws IOException {
-               this.key.write(out);
-               this.value.write(out);
-       }
+    public void setKey(GroupbyKey key) {
+        this.key = key;
+    }
 
-       /**
-        * Deserialize the fields of this object from <code>in</code>.
-        * <p/>
-        * <p>For efficiency, implementations should attempt to re-use storage in the
-        * existing object where possible.</p>
-        *
-        * @param in <code>DataInput</code> to deseriablize this object from.
-        * @throws java.io.IOException
-        */
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               this.key.readFields(in);
-               this.value.readFields(in);
-       }
+    public GroupbyValue getValue() {
+        return value;
+    }
+
+    public void setValue(GroupbyValue value) {
+        this.value = value;
+    }
+
+    /**
+     * Serialize the fields of this object to <code>out</code>.
+     *
+     * @param out <code>DataOuput</code> to serialize this object into.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void write(DataOutput out) throws IOException {
+        this.key.write(out);
+        this.value.write(out);
+    }
+
+    /**
+     * Deserialize the fields of this object from <code>in</code>.
+     * <p/>
+     * <p>For efficiency, implementations should attempt to re-use storage in the
+     * existing object where possible.</p>
+     *
+     * @param in <code>DataInput</code> to deseriablize this object from.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.key.readFields(in);
+        this.value.readFields(in);
+    }
 }
index b7f2c43..20679b9 100755 (executable)
@@ -41,99 +41,105 @@ import java.io.IOException;
  *
  * @since : 11/4/14,2014
  */
-public class GroupbyValue implements Writable{
-       private final WritableList<DoubleWritable> value;
-       private WritableList<BytesWritable> meta;
-       private int initialCapacity=1;
-       public GroupbyValue(){
-               this(1);
-       }
-       /**
-        * Constructs an empty list with the specified initial capacity.
-        *
-        * @param   initialCapacity   the initial capacity of the list
-        * @exception IllegalArgumentException if the specified initial capacity
-        *            is negative
-        */
-       public GroupbyValue(int initialCapacity ){
-               this.initialCapacity = initialCapacity;
-               this.value = new WritableList<DoubleWritable>(DoubleWritable.class,this.initialCapacity);
-               this.meta = new WritableList<BytesWritable>(BytesWritable.class,this.initialCapacity);
-       }
-
-       public WritableList<DoubleWritable> getValue(){
-               return this.value;
-       }
-
-       public WritableList<BytesWritable> getMeta(){
-               return this.meta;
-       }
-
-       public DoubleWritable get(int index){
-               return this.value.get(index);
-       }
-
-       public BytesWritable getMeta(int index){
-               if(this.meta==null) return null;
-               return this.meta.get(index);
-       }
-
-       // Values
-       public void add(DoubleWritable value){
-               this.value.add(value);
-       }
-       public void add(Double value){
-               this.value.add(new DoubleWritable(value));
-       }
-
-       public void set(int index,DoubleWritable value){
-               this.value.set(index, value);
-       }
-
-       //////////////
-       // Meta
-       /////////////
-       public void addMeta(BytesWritable meta){
-               this.meta.add(meta);
-       }
-
-       public void addMeta(int meta){
-               this.meta.add(new BytesWritable(ByteUtil.intToBytes(meta)));
-       }
-
-       public void setMeta(int index,BytesWritable meta){
-               this.meta.set(index,meta);
-       }
-       public void setMeta(int index,int meta){
-               this.meta.set(index, new BytesWritable(ByteUtil.intToBytes(meta)));
-       }
-
-       /**
-        * Serialize the fields of this object to <code>out</code>.
-        *
-        * @param out <code>DataOuput</code> to serialize this object into.
-        * @throws java.io.IOException
-        */
-       @Override
-       public void write(DataOutput out) throws IOException {
-               out.writeInt(this.initialCapacity);
-               this.value.write(out);
-               this.meta.write(out);
-       }
-
-       /**
-        * Deserialize the fields of this object from <code>in</code>.
-        * <p/>
-        * <p>For efficiency, implementations should attempt to re-use storage in the
-        * existing object where possible.</p>
-        *
-        * @param in <code>DataInput</code> to deseriablize this object from.
-        * @throws java.io.IOException
-        */
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               this.initialCapacity = in.readInt();
-               this.value.readFields(in);
-               this.meta.readFields(in);
-       }
+public class GroupbyValue implements Writable {
+    private final WritableList<DoubleWritable> value;
+    private WritableList<BytesWritable> meta;
+    private int initialCapacity = 1;
+
+    public GroupbyValue() {
+        this(1);
+    }
+
+    /**
+     * Constructs an empty list with the specified initial capacity.
+     *
+     * @param   initialCapacity   the initial capacity of the list
+     * @exception IllegalArgumentException if the specified initial capacity
+     *            is negative
+     */
+    public GroupbyValue(int initialCapacity) {
+        this.initialCapacity = initialCapacity;
+        this.value = new WritableList<DoubleWritable>(DoubleWritable.class,this.initialCapacity);
+        this.meta = new WritableList<BytesWritable>(BytesWritable.class,this.initialCapacity);
+    }
+
+    public WritableList<DoubleWritable> getValue() {
+        return this.value;
+    }
+
+    public DoubleWritable get(int index) {
+        return this.value.get(index);
+    }
+
+    public WritableList<BytesWritable> getMeta() {
+        return this.meta;
+    }
+
+    public BytesWritable getMeta(int index) {
+        if (this.meta == null) {
+            return null;
+        }
+        return this.meta.get(index);
+    }
+
+    // Values
+    public void add(DoubleWritable value) {
+        this.value.add(value);
+    }
+
+    public void add(Double value) {
+        this.value.add(new DoubleWritable(value));
+    }
+
+    public void set(int index,DoubleWritable value) {
+        this.value.set(index, value);
+    }
+
+    //////////////
+    // Meta
+    /////////////
+    public void addMeta(BytesWritable meta) {
+        this.meta.add(meta);
+    }
+
+    public void addMeta(int meta) {
+        this.meta.add(new BytesWritable(ByteUtil.intToBytes(meta)));
+    }
+
+    public void setMeta(int index,BytesWritable meta) {
+        this.meta.set(index,meta);
+    }
+
+    public void setMeta(int index,int meta) {
+        this.meta.set(index, new BytesWritable(ByteUtil.intToBytes(meta)));
+    }
+
+    /**
+     * Serialize the fields of this object to <code>out</code>.
+     *
+     * @param out <code>DataOuput</code> to serialize this object into.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(this.initialCapacity);
+        this.value.write(out);
+        this.meta.write(out);
+    }
+
+    /**
+     * Deserialize the fields of this object from <code>in</code>.
+     * <p/>
+     * <p>For efficiency, implementations should attempt to re-use storage in the
+     * existing object where possible.</p>
+     *
+     * @param in <code>DataInput</code> to deseriablize this object from.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.initialCapacity = in.readInt();
+        this.value.readFields(in);
+        this.meta.readFields(in);
+    }
 }
\ No newline at end of file
index 0468074..5883b20 100755 (executable)
@@ -25,47 +25,47 @@ import java.util.ListIterator;
 import java.util.Map;
 
 public class RawAggregator implements QualifierCreationListener,GroupbyKeyAggregatable {
-       private List<String> groupbyFields;
-       private GroupbyKey key;
-       private static final byte[] UNASSIGNED = "unassigned".getBytes();
-       private RawGroupbyBucket bucket;
+    private List<String> groupbyFields;
+    private GroupbyKey key;
+    private static final byte[] UNASSIGNED = "unassigned".getBytes();
+    private RawGroupbyBucket bucket;
 
-       public RawAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFunctionTypes, List<String> aggregatedFields, EntityDefinition ed){
-               this.groupbyFields = groupbyFields;
-               key = new GroupbyKey();
-               bucket = new RawGroupbyBucket(aggregateFunctionTypes, aggregatedFields, ed);
-       }
+    public RawAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFunctionTypes, List<String> aggregatedFields, EntityDefinition ed) {
+        this.groupbyFields = groupbyFields;
+        key = new GroupbyKey();
+        bucket = new RawGroupbyBucket(aggregateFunctionTypes, aggregatedFields, ed);
+    }
 
-       @Override
-       public void qualifierCreated(Map<String, byte[]> qualifiers){
-               key.clear();
-               ListIterator<String> it = groupbyFields.listIterator();
-               while(it.hasNext()){
-                       byte[] groupbyFieldValue = qualifiers.get(it.next());
-                       if(groupbyFieldValue == null){
-                               key.addValue(UNASSIGNED);
-                       }else{
-                               key.addValue(groupbyFieldValue);
-                       }
-               }
-               GroupbyKey newKey = null;
-               if(bucket.exists(key)){
-                       newKey = key;
-               }else{
-                       newKey = new GroupbyKey(key);
-               }
-               
-               bucket.addDatapoint(newKey, qualifiers);
-       }
+    @Override
+    public void qualifierCreated(Map<String, byte[]> qualifiers) {
+        key.clear();
+        ListIterator<String> it = groupbyFields.listIterator();
+        while (it.hasNext()) {
+            byte[] groupbyFieldValue = qualifiers.get(it.next());
+            if (groupbyFieldValue == null) {
+                key.addValue(UNASSIGNED);
+            } else {
+                key.addValue(groupbyFieldValue);
+            }
+        }
+        GroupbyKey newKey = null;
+        if (bucket.exists(key)) {
+            newKey = key;
+        } else {
+            newKey = new GroupbyKey(key);
+        }
 
-       /**
-        * @return
-        */
-       public Map<List<String>, List<Double>> result(){
-               return bucket.result();
-       }
+        bucket.addDatapoint(newKey, qualifiers);
+    }
 
-       public List<GroupbyKeyValue> getGroupbyKeyValues(){
-               return bucket.groupbyKeyValues();
-       }
+    /**
+     * @return
+     */
+    public Map<List<String>, List<Double>> result() {
+        return bucket.result();
+    }
+
+    public List<GroupbyKeyValue> getGroupbyKeyValues() {
+        return bucket.groupbyKeyValues();
+    }
 }
index 47b84a0..b0aa79c 100755 (executable)
@@ -30,167 +30,174 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 
 public class RawGroupbyBucket {
-       private final static Logger LOG = LoggerFactory.getLogger(RawGroupbyBucket.class);
+    private static final Logger LOG = LoggerFactory.getLogger(RawGroupbyBucket.class);
 
-       private List<String> aggregatedFields;
-       private EntityDefinition entityDefinition;
+    private List<String> aggregatedFields;
+    private EntityDefinition entityDefinition;
 
-       
-       private List<AggregateFunctionType> types;
-       private SortedMap<GroupbyKey, List<Function>> group2FunctionMap =
-                       new TreeMap<GroupbyKey, List<Function>>(new GroupbyKeyComparator());
 
-       public RawGroupbyBucket(List<AggregateFunctionType> types, List<String> aggregatedFields, EntityDefinition ed){
-               this.types = types;
-               this.aggregatedFields = aggregatedFields;
-               this.entityDefinition = ed;
-       }
+    private List<AggregateFunctionType> types;
+    private SortedMap<GroupbyKey, List<Function>> group2FunctionMap =
+        new TreeMap<GroupbyKey, List<Function>>(new GroupbyKeyComparator());
 
-       public boolean exists(GroupbyKey key){
-               return group2FunctionMap.containsKey(key);
-       }
+    public RawGroupbyBucket(List<AggregateFunctionType> types, List<String> aggregatedFields, EntityDefinition ed) {
+        this.types = types;
+        this.aggregatedFields = aggregatedFields;
+        this.entityDefinition = ed;
+    }
 
-       public void addDatapoint(GroupbyKey groupbyKey, Map<String, byte[]> values){
-               // locate groupby bucket
-               List<Function> functions = group2FunctionMap.get(groupbyKey);
-               if(functions == null){
-                       functions = new ArrayList<Function>();
-                       for(AggregateFunctionType type : types){
-                               FunctionFactory ff = FunctionFactory.locateFunctionFactory(type);
-                               if(ff == null){
-                                       LOG.error("FunctionFactory of AggregationFunctionType:"+type+" is null");
-                               }else{
-                                       functions.add(ff.createFunction());
-                               }
-                       }
-                       group2FunctionMap.put(groupbyKey, functions);
-               }
-               ListIterator<Function> e1 = functions.listIterator();
-               ListIterator<String> e2 = aggregatedFields.listIterator();
-               while(e1.hasNext() && e2.hasNext()){
-                       Function f = e1.next();
-                       String aggregatedField = e2.next();
-                       byte[] v = values.get(aggregatedField);
-                       if(f instanceof Function.Count){ // handle count
-                               if(entityDefinition.getMetricDefinition()==null) {
-                                       f.run(1.0);
-                                       continue;
-                               }else if(v == null){
-                                       aggregatedField = GenericMetricEntity.VALUE_FIELD;
-                                       v = values.get(aggregatedField);
-                               }
-                       }
-                       if(v != null){
-                               Qualifier q = entityDefinition.getDisplayNameMap().get(aggregatedField);
-                               EntitySerDeser<?> serDeser = q.getSerDeser();
-                               // double d = 0.0;
-                               if(serDeser instanceof IntSerDeser){
-                                       double d= (Integer)serDeser.deserialize(v);
-                                       f.run(d);
-                               }else if(serDeser instanceof LongSerDeser){
-                                       double d = (Long)serDeser.deserialize(v);
-                                       f.run(d);
-                               }else if(serDeser instanceof DoubleSerDeser){
-                                       double d = (Double)serDeser.deserialize(v);
-                                       f.run(d);
-                               // TODO: support numeric array type that is not metric
-                               }else if(serDeser instanceof DoubleArraySerDeser){
-                                       double[] d = ((DoubleArraySerDeser) serDeser).deserialize(v);
-                                       if(f instanceof Function.Count){
-                                               f.run(d.length);
-                                       } else {
-                                               for(double i:d) f.run(i);
-                                       }
-                               }else if(serDeser instanceof IntArraySerDeser){
-                                       int[] d = ((IntArraySerDeser) serDeser).deserialize(v);
-                                       if(f instanceof Function.Count){
-                                               f.run(d.length);
-                                       }else{
-                                               for(int i:d) f.run(i);
-                                       }
-                               }else{
-                                       if(LOG.isDebugEnabled()) LOG.debug("EntitySerDeser of field "+aggregatedField+" is not IntSerDeser or LongSerDeser or DoubleSerDeser or IntArraySerDeser or DoubleArraySerDeser, default as 0.0");
-                               }
-                       }else if(TokenConstant.isExpression(aggregatedField)){
-                               String expression = TokenConstant.parseExpressionContent(aggregatedField);
-                               try {
-                                       Map<String,Double> doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(values, entityDefinition);
-                                       if(entityDefinition.getMetricDefinition() == null) {
-                                               double value = ExpressionParser.eval(expression,doubleMap);
-                                               // LOG.info("DEBUG: Eval "+expression +" = "+value);
-                                               f.run(value);
-                                       }else{
-                                               Qualifier qualifier = entityDefinition.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
-                                               EntitySerDeser _serDeser = qualifier.getSerDeser();
-                                               byte[] valueBytes = values.get(GenericMetricEntity.VALUE_FIELD);
-                                               if( _serDeser instanceof DoubleArraySerDeser){
-                                                       double[] d = (double[]) _serDeser.deserialize(valueBytes);
-                                                       if(f instanceof Function.Count) {
-                                                               f.run(d.length);
-                                                       }else{
-                                                               for(double i:d){
-                                                                       doubleMap.put(GenericMetricEntity.VALUE_FIELD,i);
-                                                                       f.run(ExpressionParser.eval(expression, doubleMap));
-                                                               }
-                                                       }
-                                               }else if(_serDeser instanceof IntArraySerDeser){
-                                                       int[] d = (int[]) _serDeser.deserialize(valueBytes);
-                                                       if(f instanceof Function.Count) {
-                                                               f.run(d.length);
-                                                       }else {
-                                                               for (double i : d) {
-                                                                       doubleMap.put(GenericMetricEntity.VALUE_FIELD, i);
-                                                                       f.run(ExpressionParser.eval(expression, doubleMap));
-                                                               }
-                                                       }
-                                               }else{
-                                                       double value = ExpressionParser.eval(expression,doubleMap);
-                                                       f.run(value);
-                                               }
-                                       }
-                               } catch (Exception e) {
-                                       LOG.error("Got exception to evaluate expression: "+expression+", exception: "+e.getMessage(),e);
-                               }
-                       }
-               }
-       }
+    public boolean exists(GroupbyKey key) {
+        return group2FunctionMap.containsKey(key);
+    }
 
-       /**
-        * expensive operation - create objects and format the result
-        * @return
-        */
-       public List<GroupbyKeyValue> groupbyKeyValues(){
-               List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>();
-               for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){
-                       GroupbyValue value = new GroupbyValue();
-                       for(Function f : entry.getValue()){
-                               value.add(new DoubleWritable(f.result()));
-                               value.addMeta(f.count());
-                       }
-                       results.add(new GroupbyKeyValue(entry.getKey(),value));
-               }
-               return results;
-       }
+    public void addDatapoint(GroupbyKey groupbyKey, Map<String, byte[]> values) {
+        // locate groupby bucket
+        List<Function> functions = group2FunctionMap.get(groupbyKey);
+        if (functions == null) {
+            functions = new ArrayList<Function>();
+            for (AggregateFunctionType type : types) {
+                FunctionFactory ff = FunctionFactory.locateFunctionFactory(type);
+                if (ff == null) {
+                    LOG.error("FunctionFactory of AggregationFunctionType:" + type + " is null");
+                } else {
+                    functions.add(ff.createFunction());
+                }
+            }
+            group2FunctionMap.put(groupbyKey, functions);
+        }
+        ListIterator<Function> e1 = functions.listIterator();
+        ListIterator<String> e2 = aggregatedFields.listIterator();
+        while (e1.hasNext() && e2.hasNext()) {
+            Function f = e1.next();
+            String aggregatedField = e2.next();
+            byte[] v = values.get(aggregatedField);
+            if (f instanceof Function.Count) { // handle count
+                if (entityDefinition.getMetricDefinition() == null) {
+                    f.run(1.0);
+                    continue;
+                } else if (v == null) {
+                    aggregatedField = GenericMetricEntity.VALUE_FIELD;
+                    v = values.get(aggregatedField);
+                }
+            }
+            if (v != null) {
+                Qualifier q = entityDefinition.getDisplayNameMap().get(aggregatedField);
+                EntitySerDeser<?> serDeser = q.getSerDeser();
+                // double d = 0.0;
+                if (serDeser instanceof IntSerDeser) {
+                    double d = (Integer)serDeser.deserialize(v);
+                    f.run(d);
+                } else if (serDeser instanceof LongSerDeser) {
+                    double d = (Long)serDeser.deserialize(v);
+                    f.run(d);
+                } else if (serDeser instanceof DoubleSerDeser) {
+                    double d = (Double)serDeser.deserialize(v);
+                    f.run(d);
+                    // TODO: support numeric array type that is not metric
+                } else if (serDeser instanceof DoubleArraySerDeser) {
+                    double[] d = ((DoubleArraySerDeser) serDeser).deserialize(v);
+                    if (f instanceof Function.Count) {
+                        f.run(d.length);
+                    } else {
+                        for (double i:d) {
+                            f.run(i);
+                        }
+                    }
+                } else if (serDeser instanceof IntArraySerDeser) {
+                    int[] d = ((IntArraySerDeser) serDeser).deserialize(v);
+                    if (f instanceof Function.Count) {
+                        f.run(d.length);
+                    } else {
+                        for (int i:d) {
+                            f.run(i);
+                        }
+                    }
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("EntitySerDeser of field " + aggregatedField
+                                  + " is not IntSerDeser or LongSerDeser or DoubleSerDeser or IntArraySerDeser or DoubleArraySerDeser, default as 0.0");
+                    }
+                }
+            } else if (TokenConstant.isExpression(aggregatedField)) {
+                String expression = TokenConstant.parseExpressionContent(aggregatedField);
+                try {
+                    Map<String,Double> doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(values, entityDefinition);
+                    if (entityDefinition.getMetricDefinition() == null) {
+                        double value = ExpressionParser.eval(expression,doubleMap);
+                        // LOG.info("DEBUG: Eval "+expression +" = "+value);
+                        f.run(value);
+                    } else {
+                        Qualifier qualifier = entityDefinition.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
+                        EntitySerDeser _serDeser = qualifier.getSerDeser();
+                        byte[] valueBytes = values.get(GenericMetricEntity.VALUE_FIELD);
+                        if ( _serDeser instanceof DoubleArraySerDeser) {
+                            double[] d = (double[]) _serDeser.deserialize(valueBytes);
+                            if (f instanceof Function.Count) {
+                                f.run(d.length);
+                            } else {
+                                for (double i:d) {
+                                    doubleMap.put(GenericMetricEntity.VALUE_FIELD,i);
+                                    f.run(ExpressionParser.eval(expression, doubleMap));
+                                }
+                            }
+                        } else if (_serDeser instanceof IntArraySerDeser) {
+                            int[] d = (int[]) _serDeser.deserialize(valueBytes);
+                            if (f instanceof Function.Count) {
+                                f.run(d.length);
+                            } else {
+                                for (double i : d) {
+                                    doubleMap.put(GenericMetricEntity.VALUE_FIELD, i);
+                                    f.run(ExpressionParser.eval(expression, doubleMap));
+                                }
+                            }
+                        } else {
+                            double value = ExpressionParser.eval(expression,doubleMap);
+                            f.run(value);
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.error("Got exception to evaluate expression: " + expression + ", exception: " + e.getMessage(), e);
+                }
+            }
+        }
+    }
 
-       /**
-        * expensive operation - create objects and format the result
-        * @return
-        */
-       public Map<List<String>, List<Double>> result(){
-               Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
-               for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){
-                       List<Double> values = new ArrayList<Double>();
-                       for(Function f : entry.getValue()){
-                               values.add(f.result());
-                       }
-                       GroupbyKey key = entry.getKey();
-                       List<BytesWritable> list1 = key.getValue();
-                       List<String> list2 = new ArrayList<String>();
-                       for(BytesWritable e : list1){
-                               list2.add(new String(e.copyBytes()));
-                       }
-                       result.put(list2, values);
-               }
-               return result;
-       }
+    /**
+     * expensive operation - create objects and format the result
+     * @return
+     */
+    public List<GroupbyKeyValue> groupbyKeyValues() {
+        List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>();
+        for (Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()) {
+            GroupbyValue value = new GroupbyValue();
+            for (Function f : entry.getValue()) {
+                value.add(new DoubleWritable(f.result()));
+                value.addMeta(f.count());
+            }
+            results.add(new GroupbyKeyValue(entry.getKey(),value));
+        }
+        return results;
+    }
+
+    /**
+     * expensive operation - create objects and format the result
+     * @return
+     */
+    public Map<List<String>, List<Double>> result() {
+        Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
+        for (Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()) {
+            List<Double> values = new ArrayList<Double>();
+            for (Function f : entry.getValue()) {
+                values.add(f.result());
+            }
+            GroupbyKey key = entry.getKey();
+            List<BytesWritable> list1 = key.getValue();
+            List<String> list2 = new ArrayList<String>();
+            for (BytesWritable e : list1) {
+                list2.add(new String(e.copyBytes()));
+            }
+            result.put(list2, values);
+        }
+        return result;
+    }
 }
index f9932a5..f3d6afd 100755 (executable)
@@ -27,78 +27,75 @@ import java.util.ArrayList;
 /**
  * @since : 11/6/14,2014
  */
-public class WritableList<E extends Writable> extends ArrayList<E> implements Writable{
-       private Class<E> itemTypeClass;
+public class WritableList<E extends Writable> extends ArrayList<E> implements Writable {
+    private Class<E> itemTypeClass;
 
-       public WritableList(Class<E> typeClass){
-               this.itemTypeClass = typeClass;
-       }
+    public WritableList(Class<E> typeClass) {
+        this.itemTypeClass = typeClass;
+    }
 
-       public WritableList(Class<E> typeClass,int initialCapacity){
-               super(initialCapacity);
-               this.itemTypeClass = typeClass;
-       }
+    public WritableList(Class<E> typeClass,int initialCapacity) {
+        super(initialCapacity);
+        this.itemTypeClass = typeClass;
+    }
 
 
-       /**
-        * <h3> Get item class by </h3>
-        * <pre>
-        * (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
-        * </pre>
-        */
-       @Deprecated
-       public WritableList(){
-               this.itemTypeClass = (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
-       }
+    /**
+     * <h3> Get item class by </h3>
+     */
+    @Deprecated
+    public WritableList() {
+        this.itemTypeClass = (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+    }
 
-       private void check() throws IOException{
-               if(this.itemTypeClass == null){
-                       throw new IOException("Class Type of WritableArrayList<E extends Writable> is null");
-               }
-       }
+    private void check() throws IOException {
+        if (this.itemTypeClass == null) {
+            throw new IOException("Class Type of WritableArrayList<E extends Writable> is null");
+        }
+    }
 
-       public Class<E> getItemClass(){
-               return itemTypeClass;
-       }
+    public Class<E> getItemClass() {
+        return itemTypeClass;
+    }
 
-       /**
-        * Serialize the fields of this object to <code>out</code>.
-        *
-        * @param out <code>DataOuput</code> to serialize this object into.
-        * @throws java.io.IOException
-        */
-       @Override
-       public void write(DataOutput out) throws IOException {
-               this.check();
-               out.writeInt(this.size());
-               for(Writable item: this){
-                       item.write(out);
-               }
-       }
+    /**
+     * Serialize the fields of this object to <code>out</code>.
+     *
+     * @param out <code>DataOuput</code> to serialize this object into.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void write(DataOutput out) throws IOException {
+        this.check();
+        out.writeInt(this.size());
+        for (Writable item: this) {
+            item.write(out);
+        }
+    }
 
-       /**
-        * Deserialize the fields of this object from <code>in</code>.
-        * <p/>
-        * <p>For efficiency, implementations should attempt to re-use storage in the
-        * existing object where possible.</p>
-        *
-        * @param in <code>DataInput</code> to deseriablize this object from.
-        * @throws java.io.IOException
-        */
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               this.check();
-               int size = in.readInt();
-               for(int i=0;i<size;i++){
-                       try {
-                               E item = itemTypeClass.newInstance();
-                               item.readFields(in);
-                               this.add(item);
-                       } catch (InstantiationException e) {
-                               throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e);
-                       } catch (IllegalAccessException e) {
-                               throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e);
-                       }
-               }
-       }
+    /**
+     * Deserialize the fields of this object from <code>in</code>.
+     * <p/>
+     * <p>For efficiency, implementations should attempt to re-use storage in the
+     * existing object where possible.</p>
+     *
+     * @param in <code>DataInput</code> to deseriablize this object from.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.check();
+        int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            try {
+                E item = itemTypeClass.newInstance();
+                item.readFields(in);
+                this.add(item);
+            } catch (InstantiationException e) {
+                throw new IOException("Got exception to create instance for class: " + itemTypeClass + ": " + e.getMessage(), e);
+            } catch (IllegalAccessException e) {
+                throw new IOException("Got exception to create instance for class: " + itemTypeClass + ": " + e.getMessage(), e);
+            }
+        }
+    }
 }
index deb0838..a4c6d98 100755 (executable)
@@ -32,161 +32,163 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
-public abstract class AbstractAggregator implements Aggregator, EntityCreationListener{
-       private final static Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class);
+public abstract class AbstractAggregator implements Aggregator, EntityCreationListener {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class);
 
-       private static final String UNASSIGNED = "unassigned";
-       protected List<String> groupbyFields;
-       protected List<AggregateFunctionType> aggregateFunctionTypes;
-       protected List<String> aggregatedFields;
-       // a cache to know immediately if groupby field should come from tags(true) or qualifiers(false)
-       private Boolean[] _groupbyFieldPlacementCache;
-       private Method[] _aggregateFieldReflectedMethodCache;
+    private static final String UNASSIGNED = "unassigned";
+    protected List<String> groupbyFields;
+    protected List<AggregateFunctionType> aggregateFunctionTypes;
+    protected List<String> aggregatedFields;
+    // a cache to know immediately if groupby field should come from tags(true) or qualifiers(false)
+    private Boolean[] groupbyFieldPlacementCache;
+    private Method[] aggregateFieldReflectedMethodCache;
 
-       public AbstractAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
-               this.groupbyFields = groupbyFields;
-               this.aggregateFunctionTypes = aggregateFuntionTypes;
-               this.aggregatedFields = aggregatedFields;
-               _aggregateFieldReflectedMethodCache = new Method[this.aggregatedFields.size()];
-               _groupbyFieldPlacementCache = new Boolean[this.groupbyFields.size()];
-       }
-       
-       @Override
-       public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
-               accumulate(entity);
-       }
-       
-       public abstract Object result();
-       
-       protected String createGroupFromTags(TaggedLogAPIEntity entity, String groupbyField, int i){
-               String groupbyFieldValue = entity.getTags().get(groupbyField);
-               if(groupbyFieldValue != null){
-                       _groupbyFieldPlacementCache[i] = true;
-                       return groupbyFieldValue;
-               }
-               return null;
-       }
-       
-       protected String createGroupFromQualifiers(TaggedLogAPIEntity entity, String groupbyField, int i){
-               try{
-                       PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, groupbyField);
-                       if(pd == null)
-                               return null;
-//                     _groupbyFieldPlacementCache.put(groupbyField, false);
-                       _groupbyFieldPlacementCache[i] = false;
-                       return (String)(pd.getReadMethod().invoke(entity));
-               }catch(NoSuchMethodException ex){
-                       return null;
-               }catch(InvocationTargetException ex){
-                       return null;
-               }catch(IllegalAccessException ex){
-                       return null;
-               }
-       }
-       
-       protected String determineGroupbyFieldValue(TaggedLogAPIEntity entity, String groupbyField, int i){
-               Boolean placement = _groupbyFieldPlacementCache[i];
-               String groupbyFieldValue = null; 
-               if(placement != null){
-                       groupbyFieldValue = placement.booleanValue() ? createGroupFromTags(entity, groupbyField, i) : createGroupFromQualifiers(entity, groupbyField, i); 
-               }else{
-                       groupbyFieldValue = createGroupFromTags(entity, groupbyField, i);
-                       if(groupbyFieldValue == null){
-                               groupbyFieldValue = createGroupFromQualifiers(entity, groupbyField, i);
-                       }
-               }
-               groupbyFieldValue = (groupbyFieldValue == null ? UNASSIGNED : groupbyFieldValue);
-               return groupbyFieldValue;
-       }
-       
-       /**
-        * TODO For count aggregation, special treatment is the value is always 0 unless we support count(*) or count(<fieldname>) which counts number of rows or 
-        * number of non-null field
-        * For other aggregation, like sum,min,max,avg, we should resort to qualifiers
-        * @param entity
-        * @return
-        */
-       protected List<Double> createPreAggregatedValues(TaggedLogAPIEntity entity) throws Exception{
-               List<Double> values = new ArrayList<Double>();
-               int functionIndex = 0;
-               for(AggregateFunctionType type : aggregateFunctionTypes){
-                       if(type.name().equals(AggregateFunctionType.count.name())){
-                               values.add(new Double(1));
-                       }else{
-                               // find value in qualifier by checking java bean
-                               String aggregatedField = aggregatedFields.get(functionIndex);
-                               if(TokenConstant.isExpression(aggregatedField)){
-                                       try {
-                                               String expr = TokenConstant.parseExpressionContent(aggregatedField);
-                                               values.add(ExpressionParser.eval(expr, entity));
-                                       }catch (Exception ex){
-                                               LOG.error("Failed to evaluate expression-based aggregation: " + aggregatedField, ex);
-                                               throw ex;
-                                       }
-                               }else {
-                                       try {
-                                               Method m = _aggregateFieldReflectedMethodCache[functionIndex];
-                                               if (m == null) {
-//                                             pd = PropertyUtils.getPropertyDescriptor(entity, aggregatedField);
-//                                             if (pd == null) {
-//                                                     final String errMsg = "Field/tag " + aggregatedField + " is not defined for entity " + entity.getClass().getSimpleName();
-//                                                     logger.error(errMsg);
-//                                                     throw new Exception(errMsg);
-//                                             }
-//                                             Object obj = pd.getReadMethod().invoke(entity);
-                                                       String tmp = aggregatedField.substring(0, 1).toUpperCase() + aggregatedField.substring(1);
-                                                       m = entity.getClass().getMethod("get" + tmp);
-                                                       _aggregateFieldReflectedMethodCache[functionIndex] = m;
-                                               }
-                                               Object obj = m.invoke(entity);
-                                               values.add(numberToDouble(obj));
-                                       } catch (Exception ex) {
-                                               LOG.error("Cannot do aggregation for field " + aggregatedField, ex);
-                                               throw ex;
-                                       }
-                               }
-                       }
-                       functionIndex++;
-               }
-               return values;
-       }
-       
-       /**
-        * TODO this is a hack, we need elegant way to convert type to a broad precision
+    public AbstractAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) {
+        this.groupbyFields = groupbyFields;
+        this.aggregateFunctionTypes = aggregateFuntionTypes;
+        this.aggregatedFields = aggregatedFields;
+        aggregateFieldReflectedMethodCache = new Method[this.aggregatedFields.size()];
+        groupbyFieldPlacementCache = new Boolean[this.groupbyFields.size()];
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) throws Exception {
+        accumulate(entity);
+    }
+
+    public abstract Object result();
+
+    protected String createGroupFromTags(TaggedLogAPIEntity entity, String groupbyField, int i) {
+        String groupbyFieldValue = entity.getTags().get(groupbyField);
+        if (groupbyFieldValue != null) {
+            groupbyFieldPlacementCache[i] = true;
+            return groupbyFieldValue;
+        }
+        return null;
+    }
+
+    protected String createGroupFromQualifiers(TaggedLogAPIEntity entity, String groupbyField, int i) {
+        try {
+            PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, groupbyField);
+            if (pd == null) {
+                return null;
+            }
+            //          groupbyFieldPlacementCache.put(groupbyField, false);
+            groupbyFieldPlacementCache[i] = false;
+            return (String)(pd.getReadMethod().invoke(entity));
+        } catch (NoSuchMethodException ex) {
+            return null;
+        } catch (InvocationTargetException ex) {
+            return null;
+        } catch (IllegalAccessException ex) {
+            return null;
+        }
+    }
+
+    protected String determineGroupbyFieldValue(TaggedLogAPIEntity entity, String groupbyField, int i) {
+        Boolean placement = groupbyFieldPlacementCache[i];
+        String groupbyFieldValue = null;
+        if (placement != null) {
+            groupbyFieldValue = placement.booleanValue() ? createGroupFromTags(entity, groupbyField, i) : createGroupFromQualifiers(entity, groupbyField, i);
+        } else {
+            groupbyFieldValue = createGroupFromTags(entity, groupbyField, i);
+            if (groupbyFieldValue == null) {
+                groupbyFieldValue = createGroupFromQualifiers(entity, groupbyField, i);
+            }
+        }
+        groupbyFieldValue = (groupbyFieldValue == null ? UNASSIGNED : groupbyFieldValue);
+        return groupbyFieldValue;
+    }
+
+    /**
+     * TODO For count aggregation, special treatment is the value is always 0 unless we support count(*) or count(<fieldname>) which counts number of rows or
+     * number of non-null field
+     * For other aggregation, like sum,min,max,avg, we should resort to qualifiers
+     * @param entity
+     * @return
+     */
+    protected List<Double> createPreAggregatedValues(TaggedLogAPIEntity entity) throws Exception {
+        List<Double> values = new ArrayList<Double>();
+        int functionIndex = 0;
+        for (AggregateFunctionType type : aggregateFunctionTypes) {
+            if (type.name().equals(AggregateFunctionType.count.name())) {
+                values.add(new Double(1));
+            } else {
+                // find value in qualifier by checking java bean
+                String aggregatedField = aggregatedFields.get(functionIndex);
+                if (TokenConstant.isExpression(aggregatedField)) {
+                    try {
+                        String expr = TokenConstant.parseExpressionContent(aggregatedField);
+                        values.add(ExpressionParser.eval(expr, entity));
+                    } catch (Exception ex) {
+                        LOG.error("Failed to evaluate expression-based aggregation: " + aggregatedField, ex);
+                        throw ex;
+                    }
+                } else {
+                    try {
+                        Method m = aggregateFieldReflectedMethodCache[functionIndex];
+                        if (m == null) {
+                            //                      pd = PropertyUtils.getPropertyDescriptor(entity, aggregatedField);
+                            //                      if (pd == null) {
+                            //                          final String errMsg = "Field/tag " + aggregatedField + " is not defined for entity " + entity.getClass().getSimpleName();
+                            //                          logger.error(errMsg);
+                            //                          throw new Exception(errMsg);
+                            //                      }
+                            //                      Object obj = pd.getReadMethod().invoke(entity);
+                            String tmp = aggregatedField.substring(0, 1).toUpperCase() + aggregatedField.substring(1);
+                            m = entity.getClass().getMethod("get" + tmp);
+                            aggregateFieldReflectedMethodCache[functionIndex] = m;
+                        }
+                        Object obj = m.invoke(entity);
+                        values.add(numberToDouble(obj));
+                    } catch (Exception ex) {
+                        LOG.error("Cannot do aggregation for field " + aggregatedField, ex);
+                        throw ex;
+                    }
+                }
+            }
+            functionIndex++;
+        }
+        return values;
+    }
+
+    /**
+     * TODO this is a hack, we need elegant way to convert type to a broad precision
      *
-        * @param obj
-        * @return
-        */
-       protected Double numberToDouble(Object obj){
-               if(obj instanceof Double)
-                       return (Double)obj;
-               if(obj instanceof Integer){
-                       return new Double(((Integer)obj).doubleValue());
-               }
-               if(obj instanceof Long){
-                       return new Double(((Long)obj).doubleValue());
-               }
-               // TODO hack to support string field for demo purpose, should be removed
-               if(obj == null){
-                       return new Double(0.0);
-               }
-               if(obj instanceof String){
-                       try{
-                               return new Double((String)obj);
-                       }catch(Exception ex){
-                               LOG.warn("Datapoint ignored because it can not be converted to correct number for " + obj, ex);
-                               return new Double(0.0);
-                       }
-               }
-               if(obj instanceof double[]){
-                       double[] value = (double[]) obj;
-                       if(value.length > 0){
-                               return new Double(value[0]);
-                       }else{
-                               return new Double(0.0);
-                       }
-               }
-               
-               throw new IllegalAggregateFieldTypeException(obj.getClass().toString() + " type is not support. The aggregated field must be numeric type, int, long or double");
-       }
+     * @param obj
+     * @return
+     */
+    protected Double numberToDouble(Object obj) {
+        if (obj instanceof Double) {
+            return (Double)obj;
+        }
+        if (obj instanceof Integer) {
+            return new Double(((Integer)obj).doubleValue());
+        }
+        if (obj instanceof Long) {
+            return new Double(((Long)obj).doubleValue());
+        }
+        // TODO hack to support string field for demo purpose, should be removed
+        if (obj == null) {
+            return new Double(0.0);
+        }
+        if (obj instanceof String) {
+            try {
+                return new Double((String)obj);
+            } catch (Exception ex) {
+                LOG.warn("Datapoint ignored because it can not be converted to correct number for " + obj, ex);
+                return new Double(0.0);
+            }
+        }
+        if (obj instanceof double[]) {
+            double[] value = (double[]) obj;
+            if (value.length > 0) {
+                return new Double(value[0]);
+            } else {
+                return new Double(0.0);
+            }
+        }
+
+        throw new IllegalAggregateFieldTypeException(obj.getClass().toString() + " type is not support. The aggregated field must be numeric type, int, long or double");
+    }
 }
index 1e70e91..db62cfc 100755 (executable)
@@ -25,5 +25,5 @@ public interface Aggregator {
      * @param entity accumulated entity instance
      * @throws Exception
      */
-       public void accumulate(TaggedLogAPIEntity entity) throws Exception;
+    void accumulate(TaggedLogAPIEntity entity) throws Exception;
 }
\ No newline at end of file
index 7e35bec..a7a69e0 100644 (file)
@@ -19,7 +19,8 @@ package org.apache.eagle.query.aggregate.timeseries;
 import org.apache.eagle.log.entity.EntityCreationListener;
 
 public class EntityCreationListenerFactory {
-       public static EntityCreationListener synchronizedEntityCreationListener(EntityCreationListener listener){
-               return new SynchronizedEntityCreationListener(listener);
-       }
+
+    public static EntityCreationListener synchronizedEntityCreationListener(EntityCreationListener listener) {
+        return new SynchronizedEntityCreationListener(listener);
+    }
 }
index e12fea3..15e89ad 100755 (executable)
@@ -26,36 +26,36 @@ import org.apache.eagle.query.aggregate.AggregateFunctionType;
 /**
  * Not thread safe
  */
-public class FlatAggregator extends AbstractAggregator{
-       protected GroupbyBucket bucket;
+public class FlatAggregator extends AbstractAggregator {
+    protected GroupbyBucket bucket;
 
     /**
      * @param groupbyFields
      * @param aggregateFuntionTypes
      * @param aggregatedFields
      */
-       public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
-               super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
-               bucket = new GroupbyBucket(this.aggregateFunctionTypes);
-       }
-       
-       public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-               List<String> groupbyFieldValues = createGroup(entity);
-               List<Double> preAggregatedValues = createPreAggregatedValues(entity);
-               bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
-       }
-       
-       public Map<List<String>, List<Double>> result(){
-               return bucket.result(); 
-       }
-       
-       protected List<String> createGroup(TaggedLogAPIEntity entity){
-               List<String> groupbyFieldValues = new ArrayList<String>();
-               int i = 0;
-               for(String groupbyField : groupbyFields){
-                       String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++);
-                       groupbyFieldValues.add(groupbyFieldValue);
-               }
-               return groupbyFieldValues;
-       }
+    public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) {
+        super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+        bucket = new GroupbyBucket(this.aggregateFunctionTypes);
+    }
+
+    public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+        List<String> groupbyFieldValues = createGroup(entity);
+        List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+        bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
+    }
+
+    public Map<List<String>, List<Double>> result() {
+        return bucket.result();
+    }
+
+    protected List<String> createGroup(TaggedLogAPIEntity entity) {
+        List<String> groupbyFieldValues = new ArrayList<String>();
+        int i = 0;
+        for (String groupbyField : groupbyFields) {
+            String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++);
+            groupbyFieldValues.add(groupbyFieldValue);
+        }
+        return groupbyFieldValues;
+    }
 }
index ea57edb..93e65a9 100755 (executable)
@@ -31,211 +31,230 @@ import java.util.List;
 import java.util.Map;
 
 public class GroupbyBucket {
-       private final static Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class);
-       
-       public static Map<String, FunctionFactory> _functionFactories = 
-                       new HashMap<>();
-    
-       // TODO put this logic to AggregatorFunctionType
-       static{
-               _functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
-               _functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
-               _functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
-               _functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
-               _functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
-       }
-       
-       private List<AggregateFunctionType> types;
-//     private SortedMap<List<String>, List<Function>> group2FunctionMap = 
-//                     new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator());
-       
-       private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator());
-       
-       public GroupbyBucket(List<AggregateFunctionType> types){
-               this.types = types;
-       }
-       
-       public void addDatapoint(List<String> groupbyFieldValues, List<Double> values){
-               // LOG.info("DEBUG: addDatapoint: groupby=["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]");
-               
-               // locate groupby bucket
-               List<Function> functions = group2FunctionMap.get(groupbyFieldValues);
-               if(functions == null){
-                       functions = new ArrayList<Function>();
-                       for(AggregateFunctionType type : types){
-                               functions.add(_functionFactories.get(type.name()).createFunction());
-                       }
-                       group2FunctionMap.put(groupbyFieldValues, functions);
-               }
-               int functionIndex = 0;
-               for(Double v : values){
-                       functions.get(functionIndex).run(v);
-                       functionIndex++;
-               }
-       }
-       
-       public Map<List<String>, List<Double>> result(){
-               Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
-               for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
-                       List<Double> values = new ArrayList<Double>();
-                       for(Function f : entry.getValue()){
-                               values.add(f.result());
-                       }
-                       result.put(entry.getKey(), values);
-               }
-               return result;
-       }
-
-       public List<GroupbyKeyValue> getGroupbyKeyValue(){
-               List<GroupbyKeyValue>  results = new ArrayList<GroupbyKeyValue>();
-               
-               for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
-                       GroupbyKey key = new GroupbyKey();
-                       for(String keyStr:entry.getKey()){
-                               try {
-                                       key.addValue(keyStr.getBytes(QueryConstants.CHARSET));
-                               } catch (UnsupportedEncodingException e) {
-                                       LOG.error(e.getMessage(),e);
-                               }
-                       }
-                       GroupbyValue value = new GroupbyValue();
-                       for(Function f : entry.getValue()){
-                               value.add(f.result());
-                               value.addMeta(f.count());
-                       }
-                       results.add(new GroupbyKeyValue(key,value));
-               }
-               
-               return results;
-       }
-       
-       public static interface FunctionFactory{
-               public Function createFunction();
-       }
-       
-       public static abstract class Function{
-               protected int count;
-
-               public abstract void run(double v);
-               public abstract double result();
-               public int count(){
-                       return count;
-               }
-               public void incrCount(){
-                       count ++;
-               }
-       }
-
-       private static class CountFactory implements FunctionFactory{
-               @Override
-               public Function createFunction(){
-                       return new Count();
-               }
-       }
-       
-       
-       private static class Count extends Sum{
-               public Count(){
-                       super();
-               }
-       }
-       
-       private static class SumFactory implements FunctionFactory{
-               @Override
-               public Function createFunction(){
-                       return new Sum();
-               }
-       }
-       
-       private static class Sum extends Function{
-               private double summary;
-               public Sum(){
-                       this.summary = 0.0;
-               }
-               @Override
-               public void run(double v){
-                       this.incrCount();
-                       this.summary += v;
-               }
-               
-               @Override
-               public double result(){
-                       return this.summary;
-               }
-       }
-       
-       private static class MinFactory implements FunctionFactory{
-               @Override
-               public Function createFunction(){
-                       return new Min();
-               }
-       }
-       public static class Min extends Function{
-               private double minimum;
-               public Min(){
-                       // TODO is this a bug, or only positive numeric calculation is supported
-                       this.minimum = Double.MAX_VALUE;
-               }
-
-               @Override
-               public void run(double v){
-                       if(v < minimum){
-                               minimum = v;
-                       }
-                       this.incrCount();
-               }
-               
-               @Override
-               public double result(){
-                       return minimum;
-               }
-       }
-       
-       private static class MaxFactory implements FunctionFactory{
-               @Override
-               public Function createFunction(){
-                       return new Max();
-               }
-       }
-       public static class Max extends Function{
-               private double maximum;
-               public Max(){
-                       // TODO is this a bug, or only positive numeric calculation is supported
-                       this.maximum = 0.0;
-               }
-               @Override
-               public void run(double v){
-                       if(v > maximum){
-                               maximum = v;
-                       }
-                       this.incrCount();
-               }
-               
-               @Override
-               public double result(){
-                       return maximum;
-               }
-       }
-       
-       private static class AvgFactory implements FunctionFactory{
-               @Override
-               public Function createFunction(){
-                       return new Avg();
-               }
-       }
-       public static class Avg extends Function{
-               private double total;
-               public Avg(){
-                       this.total = 0.0;
-               }
-               @Override
-               public void run(double v){
-                       total += v;
-                       this.incrCount();
-               }
-               @Override
-               public double result(){
-                       return this.total/this.count;
-               }
-       }
+    private static final Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class);
+
+    public static Map<String, FunctionFactory> functionFactories = new HashMap<>();
+
+    // TODO put this logic to AggregatorFunctionType
+    static {
+        functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
+        functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
+        functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
+        functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
+        functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
+    }
+
+    private List<AggregateFunctionType> types;
+    // private SortedMap<List<String>, List<Function>> group2FunctionMap =
+    //     new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator());
+
+    private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator());
+
+    public GroupbyBucket(List<AggregateFunctionType> types) {
+        this.types = types;
+    }
+
+    public void addDatapoint(List<String> groupbyFieldValues, List<Double> values) {
+        // LOG.info("DEBUG: addDatapoint: groupby =["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]");
+
+        // locate groupby bucket
+        List<Function> functions = group2FunctionMap.get(groupbyFieldValues);
+        if (functions == null) {
+            functions = new ArrayList<Function>();
+            for (AggregateFunctionType type : types) {
+                functions.add(functionFactories.get(type.name()).createFunction());
+            }
+            group2FunctionMap.put(groupbyFieldValues, functions);
+        }
+        int functionIndex = 0;
+        for (Double v : values) {
+            functions.get(functionIndex).run(v);
+            functionIndex++;
+        }
+    }
+
+    public Map<List<String>, List<Double>> result() {
+        Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
+        for (Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()) {
+            List<Double> values = new ArrayList<Double>();
+            for (Function f : entry.getValue()) {
+                values.add(f.result());
+            }
+            result.put(entry.getKey(), values);
+        }
+        return result;
+    }
+
+    public List<GroupbyKeyValue> getGroupbyKeyValue() {
+        List<GroupbyKeyValue>  results = new ArrayList<GroupbyKeyValue>();
+
+        for (Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()) {
+            GroupbyKey key = new GroupbyKey();
+            for (String keyStr:entry.getKey()) {
+                try {
+                    key.addValue(keyStr.getBytes(QueryConstants.CHARSET));
+                } catch (UnsupportedEncodingException e) {
+                    LOG.error(e.getMessage(),e);
+                }
+            }
+            GroupbyValue value = new GroupbyValue();
+            for (Function f : entry.getValue()) {
+                value.add(f.result());
+                value.addMeta(f.count());
+            }
+            results.add(new GroupbyKeyValue(key,value));
+        }
+
+        return results;
+    }
+
+    public static interface FunctionFactory {
+        public Function createFunction();
+    }
+
+    public abstract static class Function {
+        protected int count;
+
+        public abstract void run(double v);
+
+        public abstract double result();
+
+        public int count() {
+            return count;
+        }
+
+        public void incrCount() {
+            count ++;
+        }
+    }
+
+    private static class CountFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Count();
+        }
+    }
+
+
+    private static class Count extends Sum {
+
+        public Count() {
+            super();
+        }
+    }
+
+    private static class SumFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Sum();
+        }
+    }
+
+    private static class Sum extends Function {
+        private double summary;
+
+        public Sum() {
+            this.summary = 0.0;
+        }
+
+        @Override
+        public void run(double v) {
+            this.incrCount();
+            this.summary += v;
+        }
+
+        @Override
+        public double result() {
+            return this.summary;
+        }
+    }
+
+    private static class MinFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Min();
+        }
+    }
+
+    public static class Min extends Function {
+        private double minimum;
+
+        public Min() {
+            // TODO is this a bug, or only positive numeric calculation is supported
+            this.minimum = Double.MAX_VALUE;
+        }
+
+        @Override
+        public void run(double v) {
+            if (v < minimum) {
+                minimum = v;
+            }
+            this.incrCount();
+        }
+
+        @Override
+        public double result() {
+            return minimum;
+        }
+    }
+
+    private static class MaxFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Max();
+        }
+    }
+
+    public static class Max extends Function {
+        private double maximum;
+
+        public Max() {
+            // TODO is this a bug, or only positive numeric calculation is supported
+            this.maximum = 0.0;
+        }
+
+        @Override
+        public void run(double v) {
+            if (v > maximum) {
+                maximum = v;
+            }
+            this.incrCount();
+        }
+
+        @Override
+        public double result() {
+            return maximum;
+        }
+    }
+
+    private static class AvgFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Avg();
+        }
+    }
+
+    public static class Avg extends Function {
+        private double total;
+
+        public Avg() {
+            this.total = 0.0;
+        }
+
+        @Override
+        public void run(double v) {
+            total += v;
+            this.incrCount();
+        }
+
+        @Override
+        public double result() {
+            return this.total / this.count;
+        }
+    }
 }
\ No newline at end of file
index 6635483..b612ccf 100644 (file)
@@ -22,18 +22,21 @@ import java.util.List;
 /**
  * this is default comparator for aggregation. The behavior is to sort by groupby fields ascendantly
  */
-public class GroupbyFieldsComparator implements Comparator<List<String>>{
-       @Override 
-    public int compare(List<String> list1, List<String> list2){
-               if(list1 == null || list2 == null || list1.size() != list2.size())
-                       throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
-               int r = 0;
-               int index = 0;
-               for(String s1 : list1){
-                       r = s1.compareTo(list2.get(index++));
-                       if(r != 0)
-                               return r;
-               }
-               return r;
-       }
+public class GroupbyFieldsComparator implements Comparator<List<String>> {
+
+    @Override
+    public int compare(List<String> list1, List<String> list2) {
+        if (list1 == null || list2 == null || list1.size() != list2.size()) {
+            throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
+        }
+        int r = 0;
+        int index = 0;
+        for (String s1 : list1) {
+            r = s1.compareTo(list2.get(index++));
+            if (r != 0) {
+                return r;
+            }
+        }
+        return r;
+    }
 }
index 341fa00..559061b 100644 (file)
@@ -25,43 +25,52 @@ import java.util.TreeMap;
 
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 public class HierarchicalAggregateEntity {
-       private String key;
-       private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>();
-       private List<Double> values = new ArrayList<Double>();
-       private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>();
-       private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null;
+    private String key;
+    private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>();
+    private List<Double> values = new ArrayList<Double>();
+    private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>();
+    private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null;
 
-       public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() {
-               return sortedList;
-       }
-       public void setSortedList(
-                       SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) {
-               this.sortedList = sortedList;
-       }
-       public List<GroupbyBucket.Function> getTmpValues() {
-               return tmpValues;
-       }
-       public void setTmpValues(List<GroupbyBucket.Function> tmpValues) {
-               this.tmpValues = tmpValues;
-       }
-       public String getKey() {
-               return key;
-       }
-       public void setKey(String key) {
-               this.key = key;
-       }
-       public List<Double> getValues() {
-               return values;
-       }
-       public void setValues(List<Double> values) {
-               this.values = values;
-       }
-       public SortedMap<String, HierarchicalAggregateEntity> getChildren() {
-               return children;
-       }
-       public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) {
-               this.children = children;
-       }
+    public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() {
+        return sortedList;
+    }
+
+    public void setSortedList(
+                              SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) {
+        this.sortedList = sortedList;
+    }
+
+    public List<GroupbyBucket.Function> getTmpValues() {
+        return tmpValues;
+    }
+
+    public void setTmpValues(List<GroupbyBucket.Function> tmpValues) {
+        this.tmpValues = tmpValues;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public List<Double> getValues() {
+        return values;
+    }
+
+    public void setValues(List<Double> values) {
+        this.values = values;
+    }
+
+    public SortedMap<String, HierarchicalAggregateEntity> getChildren() {
+        return children;
+    }
+
+    public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) {
+        this.children = children;
+    }
 }
index ecb80ac..8751a74 100755 (executable)
@@ -22,61 +22,61 @@ import java.util.SortedMap;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.query.aggregate.AggregateFunctionType;
 
-public class HierarchicalAggregator extends AbstractAggregator{
-       private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity();
+public class HierarchicalAggregator extends AbstractAggregator {
+    private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity();
 
-       public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
-               super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
-       }
+    public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) {
+        super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+    }
 
-       public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-               List<Double> preAggregatedValues = createPreAggregatedValues(entity);
-               // aggregate to root first
-               addDatapoint(root, preAggregatedValues);
-               // go through hierarchical tree
-               HierarchicalAggregateEntity current = root;
-               int i = 0;
-               for(String groupbyField : groupbyFields){
-                       // determine groupbyFieldValue from tag or fields
-                       String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i);
-                       SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren();
-                       if(children.get(groupbyFieldValue) == null){
-                               HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity();
-                               children.put(groupbyFieldValue, tmp);
-                       }
-                       children.get(groupbyFieldValue).setKey(groupbyFieldValue);
-                       addDatapoint(children.get(groupbyFieldValue), preAggregatedValues);
-                       current = children.get(groupbyFieldValue);
-               }
-       }
+    public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+        List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+        // aggregate to root first
+        addDatapoint(root, preAggregatedValues);
+        // go through hierarchical tree
+        HierarchicalAggregateEntity current = root;
+        int i = 0;
+        for (String groupbyField : groupbyFields) {
+            // determine groupbyFieldValue from tag or fields
+            String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i);
+            SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren();
+            if (children.get(groupbyFieldValue) == null) {
+                HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity();
+                children.put(groupbyFieldValue, tmp);
+            }
+            children.get(groupbyFieldValue).setKey(groupbyFieldValue);
+            addDatapoint(children.get(groupbyFieldValue), preAggregatedValues);
+            current = children.get(groupbyFieldValue);
+        }
+    }
 
-       private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values){
-               List<GroupbyBucket.Function> functions = entity.getTmpValues();
-               // initialize list of function
-               if(functions.isEmpty()){
-                       for(AggregateFunctionType type : aggregateFunctionTypes){
-                               functions.add(GroupbyBucket._functionFactories.get(type.name()).createFunction());
-                       }
-               }
-               int functionIndex = 0;
-               for(Double v : values){
-                       functions.get(functionIndex).run(v);
-                       functionIndex++;
-               }
-       }
+    private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values) {
+        List<GroupbyBucket.Function> functions = entity.getTmpValues();
+        // initialize list of function
+        if (functions.isEmpty()) {
+            for (AggregateFunctionType type : aggregateFunctionTypes) {
+                functions.add(GroupbyBucket.functionFactories.get(type.name()).createFunction());
+            }
+        }
+        int functionIndex = 0;
+        for (Double v : values) {
+            functions.get(functionIndex).run(v);
+            functionIndex++;
+        }
+    }
 
-       private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity){
-               for(GroupbyBucket.Function f : entity.getTmpValues()){
-                       entity.getValues().add(f.result());
-               }
-               for(HierarchicalAggregateEntity child : entity.getChildren().values()){
-                       finalizeHierarchicalAggregateEntity(child);
-               }
-               entity.setTmpValues(null);
-       }
+    private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity) {
+        for (GroupbyBucket.Function f : entity.getTmpValues()) {
+            entity.getValues().add(f.result());
+        }
+        for (HierarchicalAggregateEntity child : entity.getChildren().values()) {
+            finalizeHierarchicalAggregateEntity(child);
+        }
+        entity.setTmpValues(null);
+    }
 
-       public HierarchicalAggregateEntity result(){
-               finalizeHierarchicalAggregateEntity(root);
-               return this.root;
-       }
+    public HierarchicalAggregateEntity result() {
+        finalizeHierarchicalAggregateEntity(root);
+        return this.root;
+    }
 }
index f62d2c2..8ca24c6 100644 (file)
@@ -24,70 +24,74 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 public class PostFlatAggregateSort {
-       private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) {
-           SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions));
-           sortedEntries.addAll(map.entrySet());
-           return sortedEntries;
-       }
+    private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) {
+        SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions));
+        sortedEntries.addAll(map.entrySet());
+        return sortedEntries;
+    }
 
-       /**
-        * sort aggregated results with sort options
-        * @param aggregatedResult aggregated result set, but it is not sorted
-        * @sortOptions sorting options
-        * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned
-        */
-       public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN){
-               SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions);
-               List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>();
-               for (Map.Entry<List<String>, List<Double>> entry : allList) {
-                       result.add(entry);
-                       if (topN > 0 && result.size() >= topN) {
-                               break;
-                       }
-               }
-               return result;
-       }
+    /**
+     * sort aggregated results with sort options
+     * @param aggregatedResult aggregated result set, but it is not sorted
+     * @sortOptions sorting options
+     * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned
+     */
+    public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN) {
+        SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions);
+        List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>();
+        for (Map.Entry<List<String>, List<Double>> entry : allList) {
+            result.add(entry);
+            if (topN > 0 && result.size() >= topN) {
+                break;
+            }
+        }
+        return result;
+    }
+
+    private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>> {
+        private List<SortOption> sortOptions;
+
+        public MapEntryComparator(List<SortOption> sortOptions) {
+            this.sortOptions = sortOptions;
+        }
 
-       private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>>{
-               private List<SortOption> sortOptions;
-               public MapEntryComparator(List<SortOption> sortOptions){
-                       this.sortOptions = sortOptions;
-               }
-               /**
-                * default to sort by all groupby fields
-                */
-               @Override
-        public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2){
-                       int r = 0;
-                       List<String> keyList1 = e1.getKey();
-                       List<Double> valueList1 = e1.getValue();
-                       List<String> keyList2 = e2.getKey();
-                       List<Double> valueList2 = e2.getValue();
-                       for(SortOption so : sortOptions){
-                               int index = so.getIndex();
-                               if (index == -1) {
-                                       continue;
-                               }
-                               if(!so.isInGroupby()){  // sort fields come from functions
-                                       Double value1 = valueList1.get(index);
-                                       Double value2 = valueList2.get(index);
-                                       r = value1.compareTo(value2);
-                               }else{  // sort fields come from groupby fields
-                                       String key1 = keyList1.get(index);
-                                       String key2 = keyList2.get(index);
-                                       r = key1.compareTo(key2);
-                               }
-                               if(r == 0) continue;
-                               if(!so.isAscendant()){
-                                       r = -r;
-                               }
-                               return r;
-                       }
-                       // default to sort by groupby fields ascendently
-                       if(r ==0){ // TODO is this check necessary
-                               return new GroupbyFieldsComparator().compare(keyList1, keyList2);
-                       }
-                       return r;
+        /**
+         * default to sort by all groupby fields
+         */
+        @Override
+        public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2) {
+            int r = 0;
+            List<String> keyList1 = e1.getKey();
+            List<Double> valueList1 = e1.getValue();
+            List<String> keyList2 = e2.getKey();
+            List<Double> valueList2 = e2.getValue();
+            for (SortOption so : sortOptions) {
+                int index = so.getIndex();
+                if (index == -1) {
+                    continue;
+                }
+                if (!so.isInGroupby()) {  // sort fields come from functions
+                    Double value1 = valueList1.get(index);
+                    Double value2 = valueList2.get(index);
+                    r = value1.compareTo(value2);
+                } else {  // sort fields come from groupby fields
+                    String key1 = keyList1.get(index);
+                    String key2 = keyList2.get(index);
+                    r = key1.compareTo(key2);
+                }
+                if (r == 0) {
+                    continue;
+                }
+                if (!so.isAscendant()) {
+                    r = -r;
+                }
+                return r;
+            }
+            // default to sort by groupby fields ascendently
+            if (r == 0) { // TODO is this check necessary
+                return new GroupbyFieldsComparator().compare(keyList1, keyList2);
+            }
+            return r;
         }
-       }
+    }
 }
index 7b0997b..bd475f9 100644 (file)
@@ -24,69 +24,71 @@ import java.util.TreeSet;
 
 public class PostHierarchicalAggregateSort {
 
-       private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) {
-           SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions));
-           sortedEntries.addAll(entity.getChildren().entrySet());
-           return sortedEntries;
-       }
+    private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) {
+        SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions));
+        sortedEntries.addAll(entity.getChildren().entrySet());
+        return sortedEntries;
+    }
 
-       /**
-        * sort aggregated results with sort options
+    /**
+     * sort aggregated results with sort options
      *
      * @param result
      * @param sortOptions
      * @return
      */
-       public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions){
-               SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions);
-               result.setSortedList(tmp);
-               result.setChildren(null);
-               for(Map.Entry<String, HierarchicalAggregateEntity> entry : tmp){
-                       sort(entry.getValue(), sortOptions);
-               }
-               return result;
-       }
+    public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions) {
+        SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions);
+        result.setSortedList(tmp);
+        result.setChildren(null);
+        for (Map.Entry<String, HierarchicalAggregateEntity> entry : tmp) {
+            sort(entry.getValue(), sortOptions);
+        }
+        return result;
+    }
+
+    private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>> {
+        private List<SortOption> sortOptions;
 
-       private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>>{
-               private List<SortOption> sortOptions;
+        public MapEntryComparator(List<SortOption> sortOptions) {
+            this.sortOptions = sortOptions;
+        }
 
-               public MapEntryComparator(List<SortOption> sortOptions){
-                       this.sortOptions = sortOptions;
-               }
+        /**
+         * default to sort by all groupby fields
+         */
+        @Override
+        public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2) {
+            int r = 0;
+            String key1 = e1.getKey();
+            List<Double> valueList1 = e1.getValue().getValues();
+            String key2 = e2.getKey();
+            List<Double> valueList2 = e2.getValue().getValues();
+            for (SortOption so : sortOptions) {
+                int index = so.getIndex();
+                if (index == -1) {
+                    continue;
+                }
+                if (!so.isInGroupby()) {  // sort fields come from functions
+                    Double value1 = valueList1.get(index);
+                    Double value2 = valueList2.get(index);
+                    r = value1.compareTo(value2);
+                }
+                // sort fields come from groupby fields, then silently ignored
 
-               /**
-                * default to sort by all groupby fields
-                */
-               @Override
-        public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2){
-                       int r = 0;
-                       String key1 = e1.getKey();
-                       List<Double> valueList1 = e1.getValue().getValues();
-                       String key2 = e2.getKey();
-                       List<Double> valueList2 = e2.getValue().getValues();
-                       for(SortOption so : sortOptions){
-                               int index = so.getIndex();
-                               if (index == -1) {
-                                       continue;
-                               }
-                               if(!so.isInGroupby()){  // sort fields come from functions
-                                       Double value1 = valueList1.get(index);
-                                       Double value2 = valueList2.get(index);
-                                       r = value1.compareTo(value2);
-                               }  
-                               // sort fields come from groupby fields, then silently ignored
-                               
-                               if(r == 0) continue;
-                               if(!so.isAscendant()){
-                                       r = -r;
-                               }
-                               return r;
-                       }
-                       // default to sort by groupby fields ascendently
-                       if(r ==0){
-                               return key1.compareTo(key2);
-                       }
-                       return r;
+                if (r == 0) {
+                    continue;
+                }
+                if (!so.isAscendant()) {
+                    r = -r;
+                }
+                return r;
+            }
+            // default to sort by groupby fields ascendently
+            if (r == 0) {
+                return key1.compareTo(key2);
+            }
+            return r;
         }
-       }
+    }
 }
index d1578ac..c848122 100644 (file)
@@ -19,31 +19,36 @@ package org.apache.eagle.query.aggregate.timeseries;
 /**
  * sum(field1), max(field2) groupby(field3, field4) sort by field1 asc, field3 desc
  * There are 2 SortOption object, then
- * the 1st one is inGroupby=false, index=0, ascendent=true
- * the 2nd one is inGroupby=true, index=1, ascendent=false
+ * the 1st one is inGroupby = false, index=0, ascendent=true
+ * the 2nd one is inGroupby = true, index=1, ascendent=false
  *
  */
 public class SortOption {
-       private boolean inGroupby; // sort field defaultly is not from groupby fields 
-       private int index; // index relative to list of groupby fields or list of functions
-       private boolean ascendant; //asc or desc
+    private boolean inGroupby; // sort field defaultly is not from groupby fields
+    private int index; // index relative to list of groupby fields or list of functions
+    private boolean ascendant; //asc or desc
 
-       public boolean isInGroupby() {
-               return inGroupby;
-       }
-       public void setInGroupby(boolean inGroupby) {
-               this.inGroupby = inGroupby;
-       }
-       public int getIndex() {
-               return index;
-       }
-       public void setIndex(int index) {
-               this.index = index;
-       }
-       public boolean isAscendant() {
-               return ascendant;
-       }
-       public void setAscendant(boolean ascendant) {
-               this.ascendant = ascendant;
-       }
+    public boolean isInGroupby() {
+        return inGroupby;
+    }
+
+    public void setInGroupby(boolean inGroupby) {
+        this.inGroupby = inGroupby;
+    }
+
+    public int getIndex() {
+        return index;
+    }
+
+    public void setIndex(int index) {
+        this.index = index;
+    }
+
+    public boolean isAscendant() {
+        return ascendant;
+    }
+
+    public void setAscendant(boolean ascendant) {
+        this.ascendant = ascendant;
+    }
 }
index 1360e0c..2457b4e 100644 (file)
@@ -25,45 +25,45 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SortOptionsParser {
-       private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class);
-       private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$");
-               
-       public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields){
-               List<SortOption> list = new ArrayList<SortOption>();
-               for(String sortOption : sortOptions){
-                       Matcher m = pattern.matcher(sortOption);
-                       if(!m.find()){
-                               throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc");
-                       }
-                       String field = m.group(1);
-                       if (sortFields != null) {
-                               sortFields.add(field);
-                       }
-                       SortOption so = new SortOption();
-                       list.add(so);
-                       so.setAscendant(m.group(2).equals("asc") ? true : false);
-                       int index = aggregatedFields.indexOf(field); 
-                       if(index > -1){
-                               so.setInGroupby(false);
-                               so.setIndex(index);
-                               continue;
-                       }
-                       if(groupbyFields != null){  // if groupbyFields is not provided, ignore this sort field
-                               index = groupbyFields.indexOf(field);
-                               if(index > -1){
-                                       so.setInGroupby(true);
-                                       so.setIndex(index);
-                                       continue;
-                               }
-                       }
-                       logNonExistingSortByField(field);
-                       so.setInGroupby(false);
-                       so.setIndex(-1);
-               }
-               return list;
-       }
-       
-       private static void logNonExistingSortByField(String sortByField){
-               LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField);
-       }
+    private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class);
+    private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$");
+
+    public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields) {
+        List<SortOption> list = new ArrayList<SortOption>();
+        for (String sortOption : sortOptions) {
+            Matcher m = pattern.matcher(sortOption);
+            if (!m.find()) {
+                throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc");
+            }
+            String field = m.group(1);
+            if (sortFields != null) {
+                sortFields.add(field);
+            }
+            SortOption so = new SortOption();
+            list.add(so);
+            so.setAscendant(m.group(2).equals("asc") ? true : false);
+            int index = aggregatedFields.indexOf(field);
+            if (index > -1) {
+                so.setInGroupby(false);
+                so.setIndex(index);
+                continue;
+            }
+            if (groupbyFields != null) {  // if groupbyFields is not provided, ignore this sort field
+                index = groupbyFields.indexOf(field);
+                if (index > -1) {
+                    so.setInGroupby(true);
+                    so.setIndex(index);
+                    continue;
+                }
+            }
+            logNonExistingSortByField(field);
+            so.setInGroupby(false);
+            so.setIndex(-1);
+        }
+        return list;
+    }
+
+    private static void logNonExistingSortByField(String sortByField) {
+        LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField);
+    }
 }
index d8b781e..f4eabcd 100755 (executable)
@@ -18,18 +18,18 @@ package org.apache.eagle.query.aggregate.timeseries;
 
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 
-public class SynchronizedAggregator implements Aggregator{
-       private Object mutex = new Object();
-       private Aggregator agg;
-       
-       public SynchronizedAggregator(Aggregator agg){
-               this.agg = agg;
-       }
-       
-       @Override
-       public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-               synchronized(mutex){
-                       agg.accumulate(entity);
-               }
-       }
-}      
+public class SynchronizedAggregator implements Aggregator {
+    private Object mutex = new Object();
+    private Aggregator agg;
+
+    public SynchronizedAggregator(Aggregator agg) {
+        this.agg = agg;
+    }
+
+    @Override
+    public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+        synchronized (mutex) {
+            agg.accumulate(entity);
+        }
+    }
+}
index 7c1412e..baa89be 100644 (file)
@@ -19,18 +19,18 @@ package org.apache.eagle.query.aggregate.timeseries;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.EntityCreationListener;
 
-public class SynchronizedEntityCreationListener implements EntityCreationListener{
-       private Object mutex = new Object();
-       private EntityCreationListener listener;
-       
-       public SynchronizedEntityCreationListener(EntityCreationListener listener){
-               this.listener = listener;
-       }
-       
-       @Override
-       public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
-               synchronized(mutex){
-                       listener.entityCreated(entity);
-               }
-       }
+public class SynchronizedEntityCreationListener implements EntityCreationListener {
+    private Object mutex = new Object();
+    private EntityCreationListener listener;
+
+    public SynchronizedEntityCreationListener(EntityCreationListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) throws Exception {
+        synchronized (mutex) {
+            listener.entityCreated(entity);
+        }
+    }
 }
index 5bebe13..e142657 100755 (executable)
@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would 
+ * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would
  * save memory for holding all the data in the memory
  *
  * <h3>Aggregate Bucket Structure</h3>
@@ -41,129 +41,135 @@ import java.util.Map;
  *
  */
 public class TimeSeriesAggregator extends FlatAggregator implements GroupbyKeyAggregatable {
-       private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class);
-       private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000;
-       private long startTime;
-       private long endTime;
-       private long intervalms;
-       private int numFunctions;
-       private int ignoredEntityCounter = 0;
-       
-       public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields,
-                       long startTime, long endTime, long intervalms){
-               super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
-               // guard to avoid too many data points returned
-//             validateTimeRange(startTime, endTime, intervalms);
-               this.startTime = startTime;
-               this.endTime = endTime;
-               this.intervalms = intervalms;
-               this.numFunctions = aggregateFuntionTypes.size();
-       }
+    private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class);
+    private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000;
+    private long startTime;
+    private long endTime;
+    private long intervalms;
+    private int numFunctions;
+    private int ignoredEntityCounter = 0;
 
-//     @Deprecated
-//     public static void validateTimeRange(long startTime, long endTime, long intervalms){
-//             if(startTime >= endTime || intervalms <= 0){
-//                     throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms);
-//             }
-//             if((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT){
-//                     throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT + ", current # of datapoints is " + (endTime-startTime)/intervalms);
-//             }
-//     }
-       
-       public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-               List<String> groupbyFieldValues = createGroup(entity);
-               // TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side
-               // guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime
-               if(entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime){
-                       if(LOG.isDebugEnabled()) LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime);
-                       this.ignoredEntityCounter ++;
-                       return;
-               }
-               // time series bucket index
-               long located =(entity.getTimestamp() - startTime)/intervalms; 
-               groupbyFieldValues.add(String.valueOf(located));
-               List<Double> preAggregatedValues = createPreAggregatedValues(entity);
-               bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
-       }
-       
-       public Map<List<String>, List<Double>> result(){
-               if(this.ignoredEntityCounter > 0)
-                       LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
-               return bucket.result();
-       }
+    public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields,
+                                long startTime, long endTime, long intervalms) {
+        super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+        // guard to avoid too many data points returned
+        //      validateTimeRange(startTime, endTime, intervalms);
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.intervalms = intervalms;
+        this.numFunctions = aggregateFuntionTypes.size();
+    }
 
-       /**
-        * Support new aggregate result
-        *
-        * @return
-        */
-       @Override
-       public List<GroupbyKeyValue> getGroupbyKeyValues(){
-               if(this.ignoredEntityCounter > 0)
-                       LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
-               return bucket.getGroupbyKeyValue();
-       }
-       
-       public Map<List<String>, List<double[]>> getMetric(){
-               // groupbyfields+timeseriesbucket --> aggregatedvalues for different function
-               Map<List<String>, List<Double>> result = bucket.result();
-//             Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
-//             /**
-//              * bug fix: startTime is inclusive and endTime is exclusive
-//              */
-////           int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-//             int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
-//             for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
-//                     // get groups
-//                     List<String> groupbyFields = entry.getKey();
-//                     List<String> copy = new ArrayList<String>(groupbyFields);
-//                     String strTimeseriesIndex = copy.remove(copy.size()-1);
-//                     List<double[]> functionValues = timeseriesDatapoints.get(copy);
-//                     if(functionValues == null){
-//                             functionValues = new ArrayList<double[]>();
-//                             timeseriesDatapoints.put(copy, functionValues);
-//                             for(int i=0; i<numFunctions; i++){
-//                                     functionValues.add(new double[numDatapoints]);
-//                             }
-//                     }
-//                     int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
-//                     int functionIndex = 0;
-//                     for(double[] values : functionValues){
-//                             values[timeseriesIndex] = entry.getValue().get(functionIndex);
-//                             functionIndex++;
-//                     }
-//             }
-//             return timeseriesDatapoints;
-               return toMetric(result,(int)((endTime-1-startTime)/intervalms + 1),this.numFunctions);
-       }
+    //  @Deprecated
+    //  public static void validateTimeRange(long startTime, long endTime, long intervalms) {
+    //      if (startTime >= endTime || intervalms <= 0) {
+    //          throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and "
+    //                + "interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms);
+    //      }
+    //      if ((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT) {
+    //          throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT
+    //                  + ", current # of datapoints is " + (endTime-startTime)/intervalms);
+    //      }
+    //  }
 
-       public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions){
-               Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
-               /**
-                * bug fix: startTime is inclusive and endTime is exclusive
-                */
-//             int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-//             int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
-               for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
-                       // get groups
-                       List<String> groupbyFields = entry.getKey();
-                       List<String> copy = new ArrayList<String>(groupbyFields);
-                       String strTimeseriesIndex = copy.remove(copy.size()-1);
-                       List<double[]> functionValues = timeseriesDatapoints.get(copy);
-                       if(functionValues == null){
-                               functionValues = new ArrayList<double[]>();
-                               timeseriesDatapoints.put(copy, functionValues);
-                               for(int i=0; i<numFunctions; i++){
-                                       functionValues.add(new double[numDatapoints]);
-                               }
-                       }
-                       int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
-                       int functionIndex = 0;
-                       for(double[] values : functionValues){
-                               values[timeseriesIndex] = entry.getValue().get(functionIndex);
-                               functionIndex++;
-                       }
-               }
-               return timeseriesDatapoints;
-       }
+    public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+        List<String> groupbyFieldValues = createGroup(entity);
+        // TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side
+        // guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime
+        if (entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime);
+            }
+            this.ignoredEntityCounter ++;
+            return;
+        }
+        // time series bucket index
+        long located = (entity.getTimestamp() - startTime) / intervalms;
+        groupbyFieldValues.add(String.valueOf(located));
+        List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+        bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
+    }
+
+    public Map<List<String>, List<Double>> result() {
+        if (this.ignoredEntityCounter > 0) {
+            LOG.warn("Ignored " + this.ignoredEntityCounter + " entities for reason: timestamp > " + this.endTime + " or < " + this.startTime);
+        }
+        return bucket.result();
+    }
+
+    /**
+     * Support new aggregate result
+     *
+     * @return
+     */
+    @Override
+    public List<GroupbyKeyValue> getGroupbyKeyValues() {
+        if (this.ignoredEntityCounter > 0) {
+            LOG.warn("Ignored " + this.ignoredEntityCounter + " entities for reason: timestamp > " + this.endTime + " or < " + this.startTime);
+        }
+        return bucket.getGroupbyKeyValue();
+    }
+
+    public Map<List<String>, List<double[]>> getMetric() {
+        // groupbyfields+timeseriesbucket --> aggregatedvalues for different function
+        Map<List<String>, List<Double>> result = bucket.result();
+        //      Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
+        //      /**
+        //       * bug fix: startTime is inclusive and endTime is exclusive
+        //       */
+        ////        int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
+        //      int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
+        //      for (Map.Entry<List<String>, List<Double>> entry : result.entrySet()) {
+        //          // get groups
+        //          List<String> groupbyFields = entry.getKey();
+        //          List<String> copy = new ArrayList<String>(groupbyFields);
+        //          String strTimeseriesIndex = copy.remove(copy.size()-1);
+        //          List<double[]> functionValues = timeseriesDatapoints.get(copy);
+        //          if (functionValues == null) {
+        //              functionValues = new ArrayList<double[]>();
+        //              timeseriesDatapoints.put(copy, functionValues);
+        //              for (int i = 0; i<numFunctions; i++) {
+        //                  functionValues.add(new double[numDatapoints]);
+        //              }
+        //          }
+        //          int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
+        //          int functionIndex = 0;
+        //          for (double[] values : functionValues) {
+        //              values[timeseriesIndex] = entry.getValue().get(functionIndex);
+        //              functionIndex++;
+        //          }
+        //      }
+        //      return timeseriesDatapoints;
+        return toMetric(result,(int)((endTime - 1 - startTime) / intervalms + 1), this.numFunctions);
+    }
+
+    public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions) {
+        Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
+        /**
+         * bug fix: startTime is inclusive and endTime is exclusive
+         */
+        //      int numDatapoints = (int)((endTime-startTime)/intervalms + 1);
+        //      int numDatapoints = (int)((endTime-1-startTime)/intervalms + 1);
+        for (Map.Entry<List<String>, List<Double>> entry : result.entrySet()) {
+            // get groups
+            List<String> groupbyFields = entry.getKey();
+            List<String> copy = new ArrayList<String>(groupbyFields);
+            String strTimeseriesIndex = copy.remove(copy.size() - 1);
+            List<double[]> functionValues = timeseriesDatapoints.get(copy);
+            if (functionValues == null) {
+                functionValues = new ArrayList<double[]>();
+                timeseriesDatapoints.put(copy, functionValues);
+                for (int i = 0; i < numFunctions; i++) {
+                    functionValues.add(new double[numDatapoints]);
+                }
+            }
+            int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
+            int functionIndex = 0;
+            for (double[] values : functionValues) {
+                values[timeseriesIndex] = entry.getValue().get(functionIndex);
+                functionIndex++;
+            }
+        }
+        return timeseriesDatapoints;
+    }
 }
index d662658..78fa010 100644 (file)
@@ -26,51 +26,51 @@ import org.slf4j.LoggerFactory;
  * only numeric aggregation is supported and number type supported is double
  */
 public class TimeSeriesBucket {
-       private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class);
-       private long startTime;
-       private long endTime;
-       private long interval;
-       
-       // map of aggregation function to aggregated values 
-       List<double[]> aggregatedValues = new ArrayList<double[]>();
-       
-       // align from the startTime
-       /**
-        * 
-        * @param startTime milliseconds
-        * @param endTime milliseconds
-        * @param intervalMillseconds
-        * @param aggFunctions
-        */
-       public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions){
-               int count =(int)((endTime-startTime)/intervalms);
-               for(int i=0; i<numAggFunctions; i++){
-                       aggregatedValues.add(new double[count]);
-               }
-       }
-       
-       /**
-        * add datapoint which has a list of values for different aggregate functions
-        * for example, sum(numHosts), count(*), avg(timespan) etc
-        * @param timestamp
-        * @param values
-        */
-       public void addDataPoint(long timestamp, List<Double> values){
-               // locate timeseries bucket
-               if(timestamp < startTime || timestamp > endTime){
-                       LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime);
-                       return;
-               }
-               int located =(int)((timestamp - startTime)/interval);
-               int index = 0;
-               for(Double src : values){
-                       double[] timeSeriesValues = aggregatedValues.get(index);
-                       timeSeriesValues[located] += src;
-                       index++;
-               }
-       }
-       
-       public List<double[]> aggregatedValues(){
-               return this.aggregatedValues;
-       }
+    private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class);
+    private long startTime;
+    private long endTime;
+    private long interval;
+
+    // map of aggregation function to aggregated values
+    List<double[]> aggregatedValues = new ArrayList<double[]>();
+
+    // align from the startTime
+    /**
+     *
+     * @param startTime milliseconds
+     * @param endTime milliseconds
+     * @param intervalMillseconds
+     * @param aggFunctions
+     */
+    public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions) {
+        int count = (int)((endTime - startTime) / intervalms);
+        for (int i = 0; i < numAggFunctions; i++) {
+            aggregatedValues.add(new double[count]);
+        }
+    }
+
+    /**
+     * add datapoint which has a list of values for different aggregate functions
+     * for example, sum(numHosts), count(*), avg(timespan) etc
+     * @param timestamp
+     * @param values
+     */
+    public void addDataPoint(long timestamp, List<Double> values) {
+        // locate timeseries bucket
+        if (timestamp < startTime || timestamp > endTime) {
+            LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime);
+            return;
+        }
+        int located = (int)((timestamp - startTime) / interval);
+        int index = 0;
+        for (Double src : values) {
+            double[] timeSeriesValues = aggregatedValues.get(index);
+            timeSeriesValues[located] += src;
+            index++;
+        }
+    }
+
+    public List<double[]> aggregatedValues() {
+        return this.aggregatedValues;
+    }
 }
index c0a6e06..ae00fdf 100644 (file)
@@ -25,127 +25,127 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 public class TimeSeriesPostFlatAggregateSort {
-       // private static final Logger logger =
-       // LoggerFactory.getLogger(PostFlatAggregateSort.class);
-
-       private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(
-                       Map<List<String>, List<Double>> mapForSort,
-                       List<SortOption> sortOptions) {
-               SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(
-                               new MapEntryComparator(sortOptions));
-               sortedEntries.addAll(mapForSort.entrySet());
-               return sortedEntries;
-       }
-
-       /**
-        * sort aggregated results with sort options
-        * 
-        * @param entity
-        */
-       public static List<Map.Entry<List<String>, List<double[]>>> sort(
-                       Map<List<String>, List<Double>> mapForSort,
-                       Map<List<String>, List<double[]>> valueMap,
-                       List<SortOption> sortOptions, int topN) {
-
-               processIndex(sortOptions);
-               List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
-               SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue(
-                               mapForSort, sortOptions);
-               for (Map.Entry<List<String>, List<Double>> entry : sortedSet) {
-                       List<String> key = entry.getKey();
-                       List<double[]> value = valueMap.get(key);
-                       if (value != null) {
-                               Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value);
-                               result.add(newEntry);
-                               if (topN > 0 && result.size() >= topN) {
-                                       break;
-                               }
-                       }
-               }
-               return result;
-       }
-
-       private static void processIndex(List<SortOption> sortOptions) {
-               for (int i = 0; i < sortOptions.size(); ++i) {
-                       SortOption so = sortOptions.get(i);
-                       so.setIndex(i);
-               }
-       }
-
-       private static class MapEntryComparator implements
-                       Comparator<Map.Entry<List<String>, List<Double>>> {
-               private List<SortOption> sortOptions;
-
-               public MapEntryComparator(List<SortOption> sortOptions) {
-                       this.sortOptions = sortOptions;
-               }
-
-               /**
-                * default to sort by all groupby fields
-                */
-               @Override
-               public int compare(Map.Entry<List<String>, List<Double>> e1,
-                               Map.Entry<List<String>, List<Double>> e2) {
-                       int r = 0;
-                       List<String> keyList1 = e1.getKey();
-                       List<Double> valueList1 = e1.getValue();
-                       List<String> keyList2 = e2.getKey();
-                       List<Double> valueList2 = e2.getValue();
-                       for (SortOption so : sortOptions) {
-                               int index = so.getIndex();
-                               if (index == -1) {
-                                       continue;
-                               }
-                               if (!so.isInGroupby()) { // sort fields come from functions
-                                       Double value1 = valueList1.get(index);
-                                       Double value2 = valueList2.get(index);
-                                       r = value1.compareTo(value2);
-                               } else { // sort fields come from groupby fields
-                                       String key1 = keyList1.get(index);
-                                       String key2 = keyList2.get(index);
-                                       r = key1.compareTo(key2);
-                               }
-                               if (r == 0)
-                                       continue;
-                               if (!so.isAscendant()) {
-                                       r = -r;
-                               }
-                               return r;
-                       }
-                       // default to sort by groupby fields ascendently
-                       if (r == 0) { // TODO is this check necessary
-                               return new GroupbyFieldsComparator()
-                                               .compare(keyList1, keyList2);
-                       }
-                       return r;
-               }
-       }
-
-       static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable {
-               private final K key;
-               private final V value;
-
-               ImmutableEntry(K key, V value) {
-                       this.key = key;
-                       this.value = value;
-               }
-
-               @Override
-               public K getKey() {
-                       return key;
-               }
-
-               @Override
-               public V getValue() {
-                       return value;
-               }
-
-               @Override
-               public final V setValue(V value) {
-                       throw new UnsupportedOperationException();
-               }
-
-               private static final long serialVersionUID = 0;
-       }
+    // private static final Logger logger =
+    // LoggerFactory.getLogger(PostFlatAggregateSort.class);
+
+    private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(
+                                                                                Map<List<String>, List<Double>> mapForSort,
+                                                                                List<SortOption> sortOptions) {
+        SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(
+            new MapEntryComparator(sortOptions));
+        sortedEntries.addAll(mapForSort.entrySet());
+        return sortedEntries;
+    }
+
+    /**
+     * sort aggregated results with sort options
+     *
+     * @param entity
+     */
+    public static List<Map.Entry<List<String>, List<double[]>>> sort(
+                                                                     Map<List<String>, List<Double>> mapForSort,
+                                                                     Map<List<String>, List<double[]>> valueMap,
+                                                                     List<SortOption> sortOptions, int topN) {
+
+        processIndex(sortOptions);
+        List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
+        SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue(
+                                                                                 mapForSort, sortOptions);
+        for (Map.Entry<List<String>, List<Double>> entry : sortedSet) {
+            List<String> key = entry.getKey();
+            List<double[]> value = valueMap.get(key);
+            if (value != null) {
+                Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value);
+                result.add(newEntry);
+                if (topN > 0 && result.size() >= topN) {
+                    break;
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void processIndex(List<SortOption> sortOptions) {
+        for (int i = 0; i < sortOptions.size(); ++i) {
+            SortOption so = sortOptions.get(i);
+            so.setIndex(i);
+        }
+    }
+
+    private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>> {
+        private List<SortOption> sortOptions;
+
+        public MapEntryComparator(List<SortOption> sortOptions) {
+            this.sortOptions = sortOptions;
+        }
+
+        /**
+         * default to sort by all groupby fields
+         */
+        @Override
+        public int compare(Map.Entry<List<String>, List<Double>> e1,
+                           Map.Entry<List<String>, List<Double>> e2) {
+            int r = 0;
+            List<String> keyList1 = e1.getKey();
+            List<Double> valueList1 = e1.getValue();
+            List<String> keyList2 = e2.getKey();
+            List<Double> valueList2 = e2.getValue();
+            for (SortOption so : sortOptions) {
+                int index = so.getIndex();
+                if (index == -1) {
+                    continue;
+                }
+                if (!so.isInGroupby()) { // sort fields come from functions
+                    Double value1 = valueList1.get(index);
+                    Double value2 = valueList2.get(index);
+                    r = value1.compareTo(value2);
+                } else { // sort fields come from groupby fields
+                    String key1 = keyList1.get(index);
+                    String key2 = keyList2.get(index);
+                    r = key1.compareTo(key2);
+                }
+                if (r == 0) {
+                    continue;
+                }
+                if (!so.isAscendant()) {
+                    r = -r;
+                }
+                return r;
+            }
+            // default to sort by groupby fields ascendently
+            if (r == 0) { // TODO is this check necessary
+                return new GroupbyFieldsComparator()
+                    .compare(keyList1, keyList2);
+            }
+            return r;
+        }
+    }
+
+    static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable {
+        private final K key;
+        private final V value;
+
+        ImmutableEntry(K key, V value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public K getKey() {
+            return key;
+        }
+
+        @Override
+  &n