NIFI-5938: Added ability to infer record schema on read from JsonTreeReader, JsonPath...
authorMark Payne <markap14@hotmail.com>
Wed, 19 Dec 2018 23:20:52 +0000 (18:20 -0500)
committerMatthew Burgess <mattyb149@apache.org>
Mon, 11 Feb 2019 17:56:50 +0000 (12:56 -0500)
 - Updates to make UpdateRecord and RecordPath automatically update Record schema when performing update and perform the updates on the first record in UpdateRecord before obtaining Writer Schema. This allows the Writer to  to inherit the Schema of the updated Record instead of the Schema of the Record as it was when it was read.
 - Updated JoltTransformRecord so that schema is inferred on the first transformed object before passing the schema to the Record Writer, so that if writer inherits schema from record, the schema that is inherited is the trans transformed schema
 - Updated LookupRecord to allow for Record fields to be arbitrarily added
 - Implemented ContentClaimInputStream
 - Added controller service for caching schemas
 - UpdatedQueryRecord to cache schemas automatically up to some number of schemas, which will significantly inprove throughput in many cases, especially with inferred schemas.

NIFI-5938: Updated AvroTypeUtil so that if creating an Avro Schema using a field name that is not valid for Avro, it creates a Schema that uses a different, valid field name and adds an alias for the given field name so that the fields still are looked up appropriately. Fixed a bug in finding the appropriate Avro field when aliases are used. Updated ContentClaimInputStream so that if mark() is called followed by multiple calls to reset(), that each reset() call is successful instead of failing after the first one (the JavaDoc for InputStream appears to indicate that the InputStream is free to do either and in fact the InputStream is even free to allow reset() to reset to the beginning of file if mark() is not even called, if it chooses to do so instead of requiring a call to mark()).

NIFI-5938: Added another unit test for AvroTypeUtil

NIFI-5938: If using inferred schema in CSV Reader, do not consider first record as a header line. Also addressed a bug in StandardConfigurationContext that was exposed by CSVReader, in which calling getProperty(PropertyDescriptor) did not properly lookup the canonical representation of the Property Descriptor from the component before attempting to get a default value

Signed-off-by: Matthew Burgess <mattyb149@apache.org>
This closes #3253

116 files changed:
nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/FieldValue.java
nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/WriteResult.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/DateTimeMatcher.java [new file with mode: 0644]
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/DateTimeMatcherCompiler.java [new file with mode: 0644]
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/ListDateTimeMatcher.java [new file with mode: 0644]
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/RegexDateTimeMatcher.java [new file with mode: 0644]
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/SimpleDateFormatMatcher.java [new file with mode: 0644]
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/StartsWithDigitsDateTimeMatcher.java [new file with mode: 0644]
nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/text/TestRegexDateTimeMatcher.java [new file with mode: 0644]
nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java [new file with mode: 0644]
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTableScan.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSchemaCacheService.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/CachedSchemaAccessStrategy.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/HierarchicalSchemaInference.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/InferSchemaAccessStrategy.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/RecordSource.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/RecordSourceFactory.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/SchemaInferenceEngine.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/SchemaInferenceUtil.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/TimeValueInference.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/VolatileSchemaCache.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/CacheIdSchemaAccessWriter.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlArrayNode.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlContainerNode.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlNode.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlNodeType.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlRecordSource.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlTextNode.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.csv.CSVReader/additionalDetails.html
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestVolatileSchemaCache.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/prov-events.csv [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/docs-example.json [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/prov-events.json [new file with mode: 0644]
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/xml/people_nested.xml
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/xml/person.xml

index a753579..b69fbce 100644 (file)
 
 package org.apache.nifi.record.path;
 
-import java.util.Objects;
-
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 
+import java.util.Objects;
+
 public class ArrayIndexFieldValue extends StandardFieldValue {
     private final int index;
 
@@ -54,6 +55,11 @@ public class ArrayIndexFieldValue extends StandardFieldValue {
     }
 
     @Override
+    public void updateValue(final Object newValue, final DataType dataType) {
+        getParentRecord().get().setArrayValue(getField().getFieldName(), getArrayIndex(), newValue);
+    }
+
+    @Override
     public int hashCode() {
         return Objects.hash(getValue(), getField(), getParent(), index);
     }
index 88828a5..084ce73 100644 (file)
 
 package org.apache.nifi.record.path;
 
-import java.util.Optional;
-
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 
+import java.util.Optional;
+
 public interface FieldValue {
     /**
      * @return the value of the field
@@ -51,4 +52,14 @@ public interface FieldValue {
      * @param newValue the new value to set on the record field
      */
     void updateValue(Object newValue);
+
+    /**
+     * Updates the record to which the field belongs, so that it now has the given value. If the FieldValue
+     * points to a Field that does not currently exist in the Record, the field will be created in the Record Schema
+     * as an 'inactive field', which can then be incoporated into the Record's schema by calling {@link Record#incorporateInactiveFields()}.
+     *
+     * @param newValue the value to set for the field
+     * @param dataType the data type to use if the Record's schema does not already include this field
+     */
+    void updateValue(Object newValue, DataType dataType);
 }
index f52af3d..f28064d 100644 (file)
 
 package org.apache.nifi.record.path;
 
-import java.util.Objects;
-
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.RecordField;
 
+import java.util.Objects;
+
 public class MapEntryFieldValue extends StandardFieldValue {
     private final String mapKey;
 
@@ -39,6 +40,11 @@ public class MapEntryFieldValue extends StandardFieldValue {
     }
 
     @Override
+    public void updateValue(final Object newValue, final DataType dataType) {
+        getParentRecord().get().setMapValue(getField().getFieldName(), getMapKey(), newValue);
+    }
+
+    @Override
     public int hashCode() {
         return Objects.hash(getValue(), getField(), getParent(), mapKey);
     }
index 7526c0c..8ab03ca 100644 (file)
 
 package org.apache.nifi.record.path;
 
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.Optional;
-
 import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+
 public class StandardFieldValue implements FieldValue {
     private final Object value;
     private final RecordField field;
@@ -117,10 +118,21 @@ public class StandardFieldValue implements FieldValue {
 
     @Override
     public void updateValue(final Object newValue) {
+        updateValue(newValue, getField());
+    }
+
+    @Override
+    public void updateValue(final Object newValue, final DataType dataType) {
+        final RecordField currentField = getField();
+        final RecordField recordField = new RecordField(currentField.getFieldName(), dataType, currentField.getDefaultValue(), currentField.getAliases(), currentField.isNullable());
+        updateValue(newValue, recordField);
+    }
+
+    private void updateValue(final Object newValue, final RecordField field) {
         final Optional<Record> parentRecord = getParentRecord();
         if (!parentRecord.isPresent()) {
             if (value instanceof Record) {
-                ((Record) value).setValue(getField().getFieldName(), newValue);
+                ((Record) value).setValue(field, newValue);
                 return;
             } else if (value == null) {
                 return; // value is null, nothing to update
@@ -129,6 +141,7 @@ public class StandardFieldValue implements FieldValue {
             }
         }
 
-        parentRecord.get().setValue(getField().getFieldName(), newValue);
+        parentRecord.get().setValue(field, newValue);
+
     }
 }
index 188f032..1346e34 100644 (file)
 
 package org.apache.nifi.serialization;
 
-import java.io.Closeable;
-import java.io.IOException;
-
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 /**
  * <p>
  * A RowRecordReader is responsible for parsing data and returning a record at a time
@@ -47,7 +47,7 @@ public interface RecordReader extends Closeable {
      * @throws SchemaValidationException if a Record contains a field that violates the schema and cannot be coerced into the appropriate field type.
      */
     default Record nextRecord() throws IOException, MalformedRecordException {
-        return nextRecord(true, true);
+        return nextRecord(true, false);
     }
 
     /**
index 7f78448..453d88a 100644 (file)
@@ -43,11 +43,11 @@ public class SimpleRecordSchema implements RecordSchema {
     private volatile int hashCode;
 
     public SimpleRecordSchema(final List<RecordField> fields) {
-        this(fields, createText(fields), null, false, SchemaIdentifier.EMPTY);
+        this(fields, null, null, false, SchemaIdentifier.EMPTY);
     }
 
     public SimpleRecordSchema(final List<RecordField> fields, final SchemaIdentifier id) {
-        this(fields, createText(fields), null, false, id);
+        this(fields, null, null, false, id);
     }
 
     public SimpleRecordSchema(final String text, final String schemaFormat, final SchemaIdentifier id) {
index 3fb2741..ac77493 100644 (file)
@@ -51,7 +51,7 @@ public interface WriteResult {
      * @param attributes the attributes to add to the FlowFile
      * @return A {@link WriteResult} representing the given parameters
      */
-    public static WriteResult of(final int recordCount, final Map<String, String> attributes) {
+    static WriteResult of(final int recordCount, final Map<String, String> attributes) {
         return new WriteResult() {
             @Override
             public int getRecordCount() {
@@ -65,5 +65,5 @@ public interface WriteResult {
         };
     }
 
-    public static final WriteResult EMPTY = of(0, Collections.emptyMap());
+    WriteResult EMPTY = of(0, Collections.emptyMap());
 }
index 57b7ac3..2f8a766 100644 (file)
 
 package org.apache.nifi.serialization.record;
 
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
 import java.nio.charset.StandardCharsets;
 import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Supplier;
 
-import org.apache.nifi.serialization.SchemaValidationException;
-import org.apache.nifi.serialization.record.type.ArrayDataType;
-import org.apache.nifi.serialization.record.type.MapDataType;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
-
 public class MapRecord implements Record {
     private RecordSchema schema;
     private final Map<String, Object> values;
     private Optional<SerializedForm> serializedForm;
     private final boolean checkTypes;
     private final boolean dropUnknownFields;
-
+    private Set<RecordField> inactiveFields = null;
 
     public MapRecord(final RecordSchema schema, final Map<String, Object> values) {
         this(schema, values, false, false);
@@ -304,11 +309,33 @@ public class MapRecord implements Record {
     }
 
     @Override
+    public Map<String, Object> toMap() {
+        return Collections.unmodifiableMap(values);
+    }
+
+    @Override
+    public void setValue(final RecordField field, final Object value) {
+        final Optional<RecordField> existingField = setValueAndGetField(field.getFieldName(), value);
+
+        if (!existingField.isPresent()) {
+            if (inactiveFields == null) {
+                inactiveFields = new LinkedHashSet<>();
+            }
+
+            inactiveFields.add(field);
+        }
+    }
+
+    @Override
     public void setValue(final String fieldName, final Object value) {
+        setValueAndGetField(fieldName, value);
+    }
+
+    private Optional<RecordField> setValueAndGetField(final String fieldName, final Object value) {
         final Optional<RecordField> field = getSchema().getField(fieldName);
         if (!field.isPresent()) {
             if (dropUnknownFields) {
-                return;
+                return field;
             }
 
             final Object previousValue = values.put(fieldName, value);
@@ -316,7 +343,7 @@ public class MapRecord implements Record {
                 serializedForm = Optional.empty();
             }
 
-            return;
+            return field;
         }
 
         final RecordField recordField = field.get();
@@ -325,6 +352,8 @@ public class MapRecord implements Record {
         if (!Objects.equals(coerced, previousValue)) {
             serializedForm = Optional.empty();
         }
+
+        return field;
     }
 
     @Override
@@ -406,6 +435,24 @@ public class MapRecord implements Record {
     }
 
     @Override
+    public void incorporateInactiveFields() {
+        if (inactiveFields == null) {
+            return;
+        }
+
+        final List<RecordField> allFields = new ArrayList<>(schema.getFieldCount() + inactiveFields.size());
+        allFields.addAll(schema.getFields());
+
+        for (final RecordField field : inactiveFields) {
+            if (!allFields.contains(field)) {
+                allFields.add(field);
+            }
+        }
+
+        this.schema = new SimpleRecordSchema(allFields);
+    }
+
+    @Override
     public Set<String> getRawFieldNames() {
         return values.keySet();
     }
index 1a89225..a916d29 100644 (file)
 
 package org.apache.nifi.serialization.record;
 
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
 import java.util.Date;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
-
 public interface Record {
 
     RecordSchema getSchema();
@@ -53,8 +54,8 @@ public interface Record {
      * default value is <code>null</code>. Note that all values for this Record will still be valid according
      * to this Record's Schema after this operation completes, as no type will be changed except to become more
      * lenient. However, if incorporating the other schema does modify this schema, then the schema text
-     * returned by {@link #getSchemaText()}, the schema format returned by {@link #getSchemaFormat()}, and
-     * the SchemaIdentifier returned by {@link #getIdentifier()} for this record's schema may all become Empty.
+     * returned by {@link RecordSchema#getSchemaText() getSchemaText()}, the schema format returned by {@link RecordSchema#getSchemaFormat() getSchemaFormat()}, and
+     * the SchemaIdentifier returned by {@link RecordSchema#getIdentifier() getIdentifier()} for this record's schema may all become Empty.
      *
      * @param other the other schema to incorporate into this Record's schema
      *
@@ -63,6 +64,14 @@ public interface Record {
     void incorporateSchema(RecordSchema other);
 
     /**
+     * Updates the Record's schema to incorporate all of the fields that were added via the {@link #setValue(RecordField, Object)}
+     * method that did not exist in the schema.
+     *
+     * @throws UnsupportedOperationException if this record does not support incorporating other fields
+     */
+    void incorporateInactiveFields();
+
+    /**
      * <p>
      * Returns a view of the the values of the fields in this Record. Note that this method returns values only for
      * those entries in the Record's schema. This allows the Record to guarantee that it will return the values in
@@ -120,6 +129,20 @@ public interface Record {
     void setValue(String fieldName, Object value);
 
     /**
+     * Updates the value of the given field to the given value. If the field specified is not present in this Record's schema,
+     * this method will track of the field as an 'inactive field', which can then be added into the Record's schema via the
+     * {@link #incorporateInactiveFields} method. This method should not be called after each invocation of {@link #setValue(RecordField, Object)}
+     * but rather should be called only once all updates to the Record have completed, in order to optimize performance.
+     *
+     * If this method changes any value in the Record, any {@link SerializedForm} that was provided will be removed (i.e., any
+     * subsequent call to {@link #getSerializedForm()}} will return an empty Optional).
+     *
+     * @param field the field to update
+     * @param value the value to set
+     */
+    void setValue(RecordField field, Object value);
+
+    /**
      * Updates the value of a the specified index of a field. If the field specified
      * is not present in this Record's schema, this method will do nothing. If the field specified
      * is not an Array, an IllegalArgumentException will be thrown. If the field specified is an array
@@ -164,4 +187,10 @@ public interface Record {
      * @return a Set that contains the names of all of the fields that are present in the Record
      */
     Set<String> getRawFieldNames();
+
+    /**
+     * Converts the Record into a Map whose keys are the same as the Record's field names and the values are the field values
+     * @return a Map that represents the values in the Record.
+     */
+    Map<String, Object> toMap();
 }
index 2c4954c..125a44d 100644 (file)
@@ -71,7 +71,7 @@ public class RecordField {
 
         // If aliases is the empty set, don't bother with the expense of wrapping in an unmodifiableSet.
         Objects.requireNonNull(aliases);
-        if ((Set<?>) aliases == Collections.EMPTY_SET) {
+        if (aliases == Collections.EMPTY_SET) {
             this.aliases = aliases;
         } else {
             this.aliases = Collections.unmodifiableSet(aliases);
index 2e6b5d7..de9aa58 100644 (file)
@@ -23,18 +23,16 @@ import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public enum RecordFieldType {
     /**
-     * A String field type. Fields of this type use a {@code java.lang.String} value.
-     */
-    STRING("string"),
-
-    /**
      * A boolean field type. Fields of this type use a {@code boolean} value.
      */
     BOOLEAN("boolean"),
@@ -45,29 +43,24 @@ public enum RecordFieldType {
     BYTE("byte"),
 
     /**
-     * A char field type. Fields of this type use a {@code char} value.
-     */
-    CHAR("char"),
-
-    /**
      * A short field type. Fields of this type use a {@code short} value.
      */
-    SHORT("short"),
+    SHORT("short", BYTE),
 
     /**
      * An int field type. Fields of this type use an {@code int} value.
      */
-    INT("int"),
+    INT("int", SHORT, BYTE),
 
     /**
-     * A bigint field type. Fields of this type use a {@code java.math.BigInteger} value.
+     * A long field type. Fields of this type use a {@code long} value.
      */
-    BIGINT("bigint"),
+    LONG("long", SHORT, BYTE, INT),
 
     /**
-     * A long field type. Fields of this type use a {@code long} value.
+     * A bigint field type. Fields of this type use a {@code java.math.BigInteger} value.
      */
-    LONG("long"),
+    BIGINT("bigint", SHORT, BYTE, INT, LONG),
 
     /**
      * A float field type. Fields of this type use a {@code float} value.
@@ -77,7 +70,7 @@ public enum RecordFieldType {
     /**
      * A double field type. Fields of this type use a {@code double} value.
      */
-    DOUBLE("double"),
+    DOUBLE("double", FLOAT),
 
     /**
      * A date field type. Fields of this type use a {@code java.sql.Date} value.
@@ -95,6 +88,16 @@ public enum RecordFieldType {
     TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
 
     /**
+     * A char field type. Fields of this type use a {@code char} value.
+     */
+    CHAR("char"),
+
+    /**
+     * A String field type. Fields of this type use a {@code java.lang.String} value.
+     */
+    STRING("string", BOOLEAN, BYTE, CHAR, SHORT, INT, BIGINT, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP),
+
+    /**
      * <p>
      * A record field type. Fields of this type use a {@code org.apache.nifi.serialization.record.Record} value. A Record DataType should be
      * created by providing the {@link RecordSchema} for the record:
@@ -215,23 +218,38 @@ public enum RecordFieldType {
     private final String simpleName;
     private final String defaultFormat;
     private final DataType defaultDataType;
+    private final Set<RecordFieldType> narrowDataTypes;
 
     private RecordFieldType(final String simpleName) {
-        this(simpleName, null);
+        this.simpleName = simpleName;
+        this.defaultFormat = null;
+        this.defaultDataType = new DataType(this, defaultFormat);
+        this.narrowDataTypes = Collections.emptySet();
+    }
+
+    private RecordFieldType(final String simpleName, final RecordFieldType... narrowDataTypes) {
+        this.simpleName = simpleName;
+        this.defaultFormat = null;
+        this.defaultDataType = new DataType(this, defaultFormat);
+
+        this.narrowDataTypes = new HashSet<>(Arrays.asList(narrowDataTypes));
     }
 
     private RecordFieldType(final String simpleName, final String defaultFormat) {
         this.simpleName = simpleName;
         this.defaultFormat = defaultFormat;
         this.defaultDataType = new DataType(this, defaultFormat);
+        this.narrowDataTypes = Collections.emptySet();
     }
 
     private RecordFieldType(final String simpleName, final String defaultFormat, final DataType defaultDataType) {
         this.simpleName = simpleName;
         this.defaultFormat = defaultFormat;
         this.defaultDataType = defaultDataType;
+        this.narrowDataTypes = Collections.emptySet();
     }
 
+
     public String getDefaultFormat() {
         return defaultFormat;
     }
@@ -330,6 +348,18 @@ public enum RecordFieldType {
         return new MapDataType(valueDataType);
     }
 
+    /**
+     * Determines whether or this this RecordFieldType is "wider" than the provided type. A type "A" is said to be wider
+     * than another type "B" iff A encompasses all values of B and more. For example, the LONG type is wider than INT, and INT
+     * is wider than SHORT. "Complex" types (MAP, RECORD, ARRAY, CHOICE) are not wider than any other type, and no other type is
+     * wider than a complex type. The STRING type is wider than all types with the exception of complex types.
+     *
+     * @param fieldType the type to compare against
+     * @return <code>true</code> if <code>this</code> is wider than the provided type, <code>false</code> otherwise.
+     */
+    public boolean isWiderThan(final RecordFieldType fieldType) {
+        return narrowDataTypes.contains(fieldType);
+    }
 
     public static RecordFieldType of(final String typeString) {
       return SIMPLE_NAME_MAP.get(typeString);
index 46dc447..f9f2569 100644 (file)
@@ -57,7 +57,7 @@ public class ArrayDataType extends DataType {
         }
 
         final ArrayDataType other = (ArrayDataType) obj;
-        return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType);
+        return Objects.equals(elementType, other.elementType);
     }
 
     @Override
index 5ed1c39..6435195 100644 (file)
@@ -57,11 +57,11 @@ public class MapDataType extends DataType {
         }
 
         final MapDataType other = (MapDataType) obj;
-        return getValueType().equals(other.getValueType()) && Objects.equals(valueType, other.valueType);
+        return Objects.equals(valueType, other.valueType);
     }
 
     @Override
     public String toString() {
-        return "MAP[" + valueType + "]";
+        return "MAP<" + valueType + ">";
     }
 }
index f7e9631..af2044a 100644 (file)
@@ -58,7 +58,7 @@ public class RecordDataType extends DataType {
         }
 
         final RecordDataType other = (RecordDataType) obj;
-        return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema);
+        return Objects.equals(childSchema, other.childSchema);
     }
 
     @Override
index f206a64..d6e4878 100644 (file)
 
 package org.apache.nifi.serialization.record.util;
 
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.math.BigInteger;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -31,6 +45,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -41,20 +56,6 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.type.ArrayDataType;
-import org.apache.nifi.serialization.record.type.ChoiceDataType;
-import org.apache.nifi.serialization.record.type.MapDataType;
-import org.apache.nifi.serialization.record.type.RecordDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class DataTypeUtils {
     private static final Logger logger = LoggerFactory.getLogger(DataTypeUtils.class);
 
@@ -279,7 +280,7 @@ public class DataTypeUtils {
             }
 
             final Map<?, ?> map = (Map<?, ?>) value;
-            final Map<String, Object> coercedValues = new HashMap<>();
+            final Map<String, Object> coercedValues = new LinkedHashMap<>();
 
             for (final Map.Entry<?, ?> entry : map.entrySet()) {
                 final Object keyValue = entry.getKey();
@@ -304,8 +305,214 @@ public class DataTypeUtils {
         throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Record for field " + fieldName);
     }
 
+    public static Record toRecord(final Object value, final String fieldName) {
+        return toRecord(value, fieldName, StandardCharsets.UTF_8);
+    }
+
+    public static RecordSchema inferSchema(final Map<String, Object> values, final String fieldName, final Charset charset) {
+        if (values == null) {
+            return null;
+        }
+
+        final List<RecordField> inferredFieldTypes = new ArrayList<>();
+        final Map<String, Object> coercedValues = new LinkedHashMap<>();
+
+        for (final Map.Entry<?, ?> entry : values.entrySet()) {
+            final Object keyValue = entry.getKey();
+            if (keyValue == null) {
+                continue;
+            }
+
+            final String key = keyValue.toString();
+            final Object rawValue = entry.getValue();
+            final DataType inferredDataType = inferDataType(rawValue, RecordFieldType.STRING.getDataType());
+
+            final RecordField recordField = new RecordField(key, inferredDataType, true);
+            inferredFieldTypes.add(recordField);
+
+            final Object coercedValue = convertType(rawValue, inferredDataType, fieldName, charset);
+            coercedValues.put(key, coercedValue);
+        }
+
+        final RecordSchema inferredSchema = new SimpleRecordSchema(inferredFieldTypes);
+        return inferredSchema;
+    }
+
+    public static Record toRecord(final Object value, final String fieldName, final Charset charset) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Record) {
+            return ((Record) value);
+        }
+
+        final List<RecordField> inferredFieldTypes = new ArrayList<>();
+        if (value instanceof Map) {
+            final Map<?, ?> map = (Map<?, ?>) value;
+            final Map<String, Object> coercedValues = new LinkedHashMap<>();
+
+            for (final Map.Entry<?, ?> entry : map.entrySet()) {
+                final Object keyValue = entry.getKey();
+                if (keyValue == null) {
+                    continue;
+                }
+
+                final String key = keyValue.toString();
+                final Object rawValue = entry.getValue();
+                final DataType inferredDataType = inferDataType(rawValue, RecordFieldType.STRING.getDataType());
+
+                final RecordField recordField = new RecordField(key, inferredDataType, true);
+                inferredFieldTypes.add(recordField);
+
+                final Object coercedValue = convertType(rawValue, inferredDataType, fieldName, charset);
+                coercedValues.put(key, coercedValue);
+            }
+
+            final RecordSchema inferredSchema = new SimpleRecordSchema(inferredFieldTypes);
+            return new MapRecord(inferredSchema, coercedValues);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Record for field " + fieldName);
+    }
+
+    public static DataType inferDataType(final Object value, final DataType defaultType) {
+        if (value == null) {
+            return defaultType;
+        }
+
+        if (value instanceof String) {
+            return RecordFieldType.STRING.getDataType();
+        }
+
+        if (value instanceof Record) {
+            final RecordSchema schema = ((Record) value).getSchema();
+            return RecordFieldType.RECORD.getRecordDataType(schema);
+        }
+
+        if (value instanceof Number) {
+            if (value instanceof Long) {
+                return RecordFieldType.LONG.getDataType();
+            }
+            if (value instanceof Integer) {
+                return RecordFieldType.INT.getDataType();
+            }
+            if (value instanceof Short) {
+                return RecordFieldType.SHORT.getDataType();
+            }
+            if (value instanceof Byte) {
+                return RecordFieldType.BYTE.getDataType();
+            }
+            if (value instanceof Float) {
+                return RecordFieldType.FLOAT.getDataType();
+            }
+            if (value instanceof Double) {
+                return RecordFieldType.DOUBLE.getDataType();
+            }
+            if (value instanceof BigInteger) {
+                return RecordFieldType.BIGINT.getDataType();
+            }
+        }
+
+        if (value instanceof Boolean) {
+            return RecordFieldType.BOOLEAN.getDataType();
+        }
+        if (value instanceof java.sql.Time) {
+            return RecordFieldType.TIME.getDataType();
+        }
+        if (value instanceof java.sql.Timestamp) {
+            return RecordFieldType.TIMESTAMP.getDataType();
+        }
+        if (value instanceof java.util.Date) {
+            return RecordFieldType.DATE.getDataType();
+        }
+        if (value instanceof Character) {
+            return RecordFieldType.CHAR.getDataType();
+        }
+
+        // A value of a Map could be either a Record or a Map type. In either case, it must have Strings as keys.
+        if (value instanceof Map) {
+            final Map<String, ?> map = (Map<String, ?>) value;
+            return inferRecordDataType(map);
+//            // Check if all types are the same.
+//            if (map.isEmpty()) {
+//                return RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType());
+//            }
+//
+//            Object valueFromMap = null;
+//            Class<?> valueClass = null;
+//            for (final Object val : map.values()) {
+//                if (val == null) {
+//                    continue;
+//                }
+//
+//                valueFromMap = val;
+//                final Class<?> currentValClass = val.getClass();
+//                if (valueClass == null) {
+//                    valueClass = currentValClass;
+//                } else {
+//                    // If we have two elements that are of different types, then we cannot have a Map. Must be a Record.
+//                    if (valueClass != currentValClass) {
+//                        return inferRecordDataType(map);
+//                    }
+//                }
+//            }
+//
+//            // All values appear to be of the same type, so assume that it's a map.
+//            final DataType elementDataType = inferDataType(valueFromMap, RecordFieldType.STRING.getDataType());
+//            return RecordFieldType.MAP.getMapDataType(elementDataType);
+        }
+        if (value instanceof Object[]) {
+            final Object[] array = (Object[]) value;
+
+            DataType mergedDataType = null;
+            for (final Object arrayValue : array) {
+                final DataType inferredDataType = inferDataType(arrayValue, RecordFieldType.STRING.getDataType());
+                mergedDataType = mergeDataTypes(mergedDataType, inferredDataType);
+            }
+
+            if (mergedDataType == null) {
+                mergedDataType = RecordFieldType.STRING.getDataType();
+            }
+
+            return RecordFieldType.ARRAY.getArrayDataType(mergedDataType);
+        }
+        if (value instanceof Iterable) {
+            final Iterable iterable = (Iterable<?>) value;
+
+            DataType mergedDataType = null;
+            for (final Object arrayValue : iterable) {
+                final DataType inferredDataType = inferDataType(arrayValue, RecordFieldType.STRING.getDataType());
+                mergedDataType = mergeDataTypes(mergedDataType, inferredDataType);
+            }
+
+            if (mergedDataType == null) {
+                mergedDataType = RecordFieldType.STRING.getDataType();
+            }
+
+            return RecordFieldType.ARRAY.getArrayDataType(mergedDataType);
+        }
+
+        return defaultType;
+    }
+
+    private static DataType inferRecordDataType(final Map<String, ?> map) {
+        final List<RecordField> fields = new ArrayList<>(map.size());
+        for (final Map.Entry<String, ?> entry : map.entrySet()) {
+            final String key = entry.getKey();
+            final Object value = entry.getValue();
+
+            final DataType dataType = inferDataType(value, RecordFieldType.STRING.getDataType());
+            final RecordField field = new RecordField(key, dataType, true);
+            fields.add(field);
+        }
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        return RecordFieldType.RECORD.getRecordDataType(schema);
+    }
+
     public static boolean isRecordTypeCompatible(final Object value) {
-        return value != null && value instanceof Record;
+        return value instanceof Record;
     }
 
     public static Object[] toArray(final Object value, final String fieldName, final DataType elementDataType) {
@@ -374,7 +581,7 @@ public class DataTypeUtils {
                 return (Map<String, Object>) value;
             }
 
-            final Map<String, Object> transformed = new HashMap<>();
+            final Map<String, Object> transformed = new LinkedHashMap<>();
             for (final Map.Entry<?, ?> entry : original.entrySet()) {
                 final Object key = entry.getKey();
                 if (key == null) {
@@ -395,7 +602,7 @@ public class DataTypeUtils {
                     + " because Record does not have an associated Schema");
             }
 
-            final Map<String, Object> map = new HashMap<>();
+            final Map<String, Object> map = new LinkedHashMap<>();
             for (final String recordFieldName : recordSchema.getFieldNames()) {
                 map.put(recordFieldName, record.getValue(recordFieldName));
             }
@@ -426,20 +633,20 @@ public class DataTypeUtils {
             if (recordSchema == null) {
                 throw new IllegalTypeConversionException("Cannot convert value of type Record to Map because Record does not have an associated Schema");
             }
-            final Map<String, Object> recordMap = new HashMap<>();
+
+            final Map<String, Object> recordMap = new LinkedHashMap<>();
             for (RecordField field : recordSchema.getFields()) {
                 final DataType fieldDataType = field.getDataType();
                 final String fieldName = field.getFieldName();
                 Object fieldValue = record.getValue(fieldName);
+
                 if (fieldValue == null) {
                     recordMap.put(fieldName, null);
                 } else if (isScalarValue(fieldDataType, fieldValue)) {
                     recordMap.put(fieldName, fieldValue);
-
                 } else if (fieldDataType instanceof RecordDataType) {
                     Record nestedRecord = (Record) fieldValue;
                     recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, fieldDataType));
-
                 } else if (fieldDataType instanceof MapDataType) {
                     recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType)fieldDataType).getValueType()));
 
@@ -452,7 +659,7 @@ public class DataTypeUtils {
             }
             return recordMap;
         } else if (value instanceof Map) {
-            return convertRecordMapToJavaMap((Map)value, ((MapDataType)dataType).getValueType());
+            return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType());
         } else if (dataType != null && isScalarValue(dataType, value)) {
             return value;
         }
@@ -1186,14 +1393,103 @@ public class DataTypeUtils {
             defaultValue = thisField.getDefaultValue();
         }
 
-        final DataType dataType;
-        if (thisField.getDataType().equals(otherField.getDataType())) {
-            dataType = thisField.getDataType();
+        final DataType dataType = mergeDataTypes(thisField.getDataType(), otherField.getDataType());
+        return new RecordField(fieldName, dataType, defaultValue, aliases, thisField.isNullable() || otherField.isNullable());
+    }
+
+    public static DataType mergeDataTypes(final DataType thisDataType, final DataType otherDataType) {
+        if (thisDataType == null) {
+            return otherDataType;
+        }
+
+        if (otherDataType == null) {
+            return thisDataType;
+        }
+
+        if (thisDataType.equals(otherDataType)) {
+            return thisDataType;
         } else {
-            dataType = RecordFieldType.CHOICE.getChoiceDataType(thisField.getDataType(), otherField.getDataType());
+            // If one type is 'wider' than the other (such as an INT and a LONG), just use the wider type (LONG, in this case),
+            // rather than using a CHOICE of the two.
+            final Optional<DataType> widerType = getWiderType(thisDataType, otherDataType);
+            if (widerType.isPresent()) {
+                return widerType.get();
+            }
+
+            final Set<DataType> possibleTypes = new LinkedHashSet<>();
+            if (thisDataType.getFieldType() == RecordFieldType.CHOICE) {
+                possibleTypes.addAll(((ChoiceDataType) thisDataType).getPossibleSubTypes());
+            } else {
+                possibleTypes.add(thisDataType);
+            }
+
+            if (otherDataType.getFieldType() == RecordFieldType.CHOICE) {
+                possibleTypes.addAll(((ChoiceDataType) otherDataType).getPossibleSubTypes());
+            } else {
+                possibleTypes.add(otherDataType);
+            }
+
+            return RecordFieldType.CHOICE.getChoiceDataType(new ArrayList<>(possibleTypes));
         }
+    }
 
-        return new RecordField(fieldName, dataType, defaultValue, aliases, thisField.isNullable() || otherField.isNullable());
+    public static Optional<DataType> getWiderType(final DataType thisDataType, final DataType otherDataType) {
+        final RecordFieldType thisFieldType = thisDataType.getFieldType();
+        final RecordFieldType otherFieldType = otherDataType.getFieldType();
+
+        final int thisIntTypeValue = getIntegerTypeValue(thisFieldType);
+        final int otherIntTypeValue = getIntegerTypeValue(otherFieldType);
+        if (thisIntTypeValue > -1 && otherIntTypeValue > -1) {
+            if (thisIntTypeValue > otherIntTypeValue) {
+                return Optional.of(thisDataType);
+            }
+
+            return Optional.of(otherDataType);
+        }
+
+        switch (thisFieldType) {
+            case FLOAT:
+                if (otherFieldType == RecordFieldType.DOUBLE) {
+                    return Optional.of(otherDataType);
+                }
+                break;
+            case DOUBLE:
+                if (otherFieldType == RecordFieldType.FLOAT) {
+                    return Optional.of(thisDataType);
+                }
+                break;
+
+
+            case CHAR:
+                if (otherFieldType == RecordFieldType.STRING) {
+                    return Optional.of(otherDataType);
+                }
+                break;
+            case STRING:
+                if (otherFieldType == RecordFieldType.CHAR) {
+                    return Optional.of(thisDataType);
+                }
+                break;
+        }
+
+        return Optional.empty();
+    }
+
+    private static int getIntegerTypeValue(final RecordFieldType fieldType) {
+        switch (fieldType) {
+            case BIGINT:
+                return 4;
+            case LONG:
+                return 3;
+            case INT:
+                return 2;
+            case SHORT:
+                return 1;
+            case BYTE:
+                return 0;
+            default:
+                return -1;
+        }
     }
 
     public static boolean isScalarValue(final DataType dataType, final Object value) {
index f225bb4..dc36926 100644 (file)
  */
 package org.apache.nifi.remote.io.socket.ssl;
 
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.io.socket.BufferStateManager;
+import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
+import org.apache.nifi.security.util.CertificateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -30,18 +43,6 @@ import java.security.cert.Certificate;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.TimeUnit;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-import org.apache.nifi.remote.io.socket.BufferStateManager;
-import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
-import org.apache.nifi.security.util.CertificateUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class SSLSocketChannel implements Closeable {
 
@@ -582,7 +583,12 @@ public class SSLSocketChannel implements Closeable {
                     continue;
                 }
                 case CLOSED:
-                    throw new IOException("Channel is closed");
+                    copied = copyFromAppDataBuffer(buffer, offset, len);
+                    if (copied == 0) {
+                        return -1;
+                    }
+                    streamInManager.compact();
+                    return copied;
                 case OK: {
                     copied = copyFromAppDataBuffer(buffer, offset, len);
                     if (copied == 0) {
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/DateTimeMatcher.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/DateTimeMatcher.java
new file mode 100644 (file)
index 0000000..2e35ec2
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.text;
+
+/**
+ * <p>
+ * A utility class that can be used to determine whether or not a String matches a given date/time format, as specified
+ * by the Time Format used in {@link java.text.SimpleDateFormat}. It is not uncommon to see code written along the lines of:
+ * </p>
+ *
+ * <code><pre>
+ * final String format = "yyyy/MM/dd HH:mm:ss.SSS";
+ * try {
+ *     new SimpleDateFormat(format).parse(text);
+ *     return true;
+ * } catch (Exception e) {
+ *     return false;
+ * }
+ * </pre></code>
+ *
+ * <p>
+ *     This approach, however, is frowned upon for two important reasons. Firstly, the performance is poor. A micro-benchmark that involves executing
+ *     the above code (even reusing the SimpleDateFormat object) to evaluate whether or not <code>text</code> is a timestamp took approximately 125-130 seconds
+ *     to iterate 1,000,000 times (after discarding the first 1,000,000 iterations as a 'warmup'). As a comparison, this utility takes about 8-11 seconds against
+ *     the same data and on the same machine.
+ * </p>
+ *
+ * <p>
+ *     Secondly, the above snippet has a very expensive side effect of throwing an Exception if the text does not match the format. This Exception is silently ignored,
+ *     but can have devastating effects on the JVM as a whole, as creating the Exception object can result in requiring a Safepoint, which means that all threads in the JVM
+ *     may be forced to pause.
+ * </p>
+ *
+ * <p>
+ *     Note, however, that this class is not intended to replace SimpleDateFormat, as it does not perform the actual parsing but instead only determines whether or not
+ *     a given input text matches the pattern, so that if it does, a SimpleDateFormat can be used parse the input.
+ * </p>
+ */
+public interface DateTimeMatcher {
+    /**
+     * Determines whether or not the text matches the pattern
+     * @param text the text to evaluate
+     * @return <code>true</code> if the text matches the pattern, <code>false</code> otherwise
+     */
+    boolean matches(String text);
+
+    static DateTimeMatcher compile(String format) {
+        if (format == null) {
+            return t -> false;
+        }
+
+        return new DateTimeMatcherCompiler().compile(format);
+    }
+}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/DateTimeMatcherCompiler.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/DateTimeMatcherCompiler.java
new file mode 100644 (file)
index 0000000..5bf854b
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.text;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class DateTimeMatcherCompiler {
+
+    DateTimeMatcher compile(final String format) {
+        final RegexDateTimeMatcher regexMatcher = new RegexDateTimeMatcher.Compiler().compile(format);
+
+        final List<DateTimeMatcher> matchers = new ArrayList<>(4);
+
+        // Add a matcher that will filter out any input if it's too short or too long to match the regex.
+        // This allows us to save on the expense of evaluating the Regular Expression in some cases.
+        final int minLength = regexMatcher.getMinInputLength();
+        final int maxLength = regexMatcher.getMaxInputLength();
+        matchers.add(input -> input.length() >= minLength && input.length() <= maxLength);
+
+        // Look for common patterns in date/time formats that allow us to quickly determine if some input text
+        // will match the pattern. For example, a common date pattern is yyyy/MM/dd or MM/dd/yyyy. In the first
+        // case, we know that it is not going to match unless the first 4 characters of the input are digits.
+        // In the later case, we know that it will not match if the first 2 characters are not digits.
+        if (format.startsWith("yyyy")) {
+            matchers.add(new StartsWithDigitsDateTimeMatcher(4));
+        } else if (format.startsWith("yy") || format.startsWith("mm")) {
+            matchers.add(new StartsWithDigitsDateTimeMatcher(2));
+        } else if (format.startsWith("H") || format.startsWith("h")) {
+            matchers.add(new StartsWithDigitsDateTimeMatcher(1));
+        } else if (format.startsWith("M") && !format.startsWith("MMM")) {
+            // If format begins with M, it could be a number of a month name. So we have to check if it starts with at least 3 M's to determine if the month is a number of a name.
+            matchers.add(new StartsWithDigitsDateTimeMatcher(1));
+        }
+
+        matchers.add(regexMatcher);
+
+        // Use the SimpleDateFormatMatcher only if our regex matches. This allows us to parse the date only to guarantee that we are correct if we say that the input text matches.
+        matchers.add(new SimpleDateFormatMatcher(format));
+        return new ListDateTimeMatcher(matchers);
+    }
+}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/ListDateTimeMatcher.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/ListDateTimeMatcher.java
new file mode 100644 (file)
index 0000000..dd3e0d0
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.text;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An implementation of the DateTimeMatcher that accepts in its constructor a List of delegate DateTimeMatchers.
+ * This matcher will return <code>true</code> if and only if ALL matchers in the constructor return <code>true</code> for
+ * the same input.
+ */
+class ListDateTimeMatcher implements DateTimeMatcher {
+    private final List<DateTimeMatcher> matchers;
+
+    public ListDateTimeMatcher(final List<DateTimeMatcher> matchers) {
+        this.matchers = new ArrayList<>(matchers);
+    }
+
+    @Override
+    public boolean matches(final String text) {
+        for (final DateTimeMatcher matcher : matchers) {
+            if (!matcher.matches(text)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/RegexDateTimeMatcher.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/RegexDateTimeMatcher.java
new file mode 100644 (file)
index 0000000..fe1b919
--- /dev/null
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.text;
+
+import java.text.DateFormatSymbols;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class RegexDateTimeMatcher implements DateTimeMatcher {
+    private final Pattern pattern;
+    private final List<String> subPatterns;
+    private final int minLength;
+    private final int maxLength;
+
+    private RegexDateTimeMatcher(final Pattern pattern, final List<String> subPatterns, final int minLength, final int maxLength) {
+        this.pattern = pattern;
+        this.subPatterns = subPatterns;
+        this.minLength = minLength;
+        this.maxLength = maxLength;
+    }
+
+    public int getMinInputLength() {
+        return minLength;
+    }
+
+    public int getMaxInputLength() {
+        return maxLength;
+    }
+
+    @Override
+    public boolean matches(final String text) {
+        if (text.length() < minLength || text.length() > maxLength) {
+            return false;
+        }
+
+        return pattern.matcher(text).matches();
+    }
+
+    // This method is not used except in order to help debugging. If a pattern is not matching a given input, this can be used
+    // to help determine which part of the compiled regular expression is not matching the input
+    public String determineMismatch(final String text) {
+        for (int patternsToUse = subPatterns.size() - 1; patternsToUse >= 0; patternsToUse--) {
+            final StringBuilder sb = new StringBuilder();
+
+            for (int i=0; i < patternsToUse; i++) {
+                sb.append(subPatterns.get(i));
+            }
+
+            final String regex = "^" + sb.toString();
+            final Pattern pattern = Pattern.compile(regex);
+            final Matcher matcher = pattern.matcher(text);
+            final boolean found = matcher.find();
+            if (found) {
+                return "Longest Match: <" + matcher.group(0) + "> based on pattern <" + regex + ">. The following portions did not match: " + subPatterns.subList(patternsToUse, subPatterns.size());
+            }
+        }
+
+        return "Could not match any part of the pattern";
+    }
+
+
+    public static class Compiler {
+        private final List<String> patterns = new ArrayList<>();
+
+        private char currentPattern;
+        private int charCount;
+        private boolean patternStarted = false;
+
+        private static final String AMPM_PATTERN;
+        private static final String ERAS_PATTERN;
+        private static final String MONTHS_PATTERN;
+        private static final String LONG_WEEKDAY_PATTERN;
+        private static final String SHORT_WEEKDAY_PATTERN;
+        private static final String ZONE_NAME_PATTERN;
+
+        private static final LengthRange AMPM_RANGE;
+        private static final LengthRange ERAS_RANGE;
+        private static final LengthRange MONTH_NAME_RANGE;
+        private static final LengthRange LONG_WEEKDAY_RANGE;
+        private static final LengthRange SHORT_WEEKDAY_RANGE;
+        private static final LengthRange ZONE_NAME_RANGE;
+
+        private LengthRange range = new LengthRange(0, 0);
+
+        static {
+            final DateFormatSymbols dateFormatSymbols = DateFormatSymbols.getInstance(Locale.US);
+
+            final String[] ampm = dateFormatSymbols.getAmPmStrings();
+            AMPM_PATTERN = joinRegex(ampm);
+            AMPM_RANGE = lengthRange(ampm);
+
+            final String[] eras = dateFormatSymbols.getEras();
+            ERAS_PATTERN = joinRegex(eras);
+            ERAS_RANGE = lengthRange(eras);
+
+            final List<String> monthNames = new ArrayList<>();
+            monthNames.addAll(Arrays.asList(dateFormatSymbols.getMonths()));
+            monthNames.addAll(Arrays.asList(dateFormatSymbols.getShortMonths()));
+            final String[] monthNameArray = monthNames.toArray(new String[0]);
+            MONTHS_PATTERN = joinRegex(monthNameArray);
+            MONTH_NAME_RANGE = lengthRange(monthNameArray);
+
+            final String[] longWeekdays = dateFormatSymbols.getWeekdays();
+            LONG_WEEKDAY_PATTERN = joinRegex(longWeekdays);
+            LONG_WEEKDAY_RANGE = lengthRange(longWeekdays);
+
+            final String[] shortWeekdays = dateFormatSymbols.getShortWeekdays();
+            SHORT_WEEKDAY_PATTERN = joinRegex(shortWeekdays);
+            SHORT_WEEKDAY_RANGE = lengthRange(shortWeekdays);
+
+            int maxTimeZoneLength = 0;
+            final String[][] zoneStrings = dateFormatSymbols.getZoneStrings();
+            final StringBuilder zoneNamePatternBuilder = new StringBuilder();
+            for (final String[] zoneNames : zoneStrings) {
+                for (final String zoneName : zoneNames) {
+                    zoneNamePatternBuilder.append(Pattern.quote(zoneName)).append("|");
+
+                    maxTimeZoneLength = Math.max(maxTimeZoneLength, zoneName.length());
+                }
+            }
+
+            zoneNamePatternBuilder.deleteCharAt(zoneNamePatternBuilder.length() - 1);
+            ZONE_NAME_PATTERN = zoneNamePatternBuilder.toString();
+            ZONE_NAME_RANGE = new LengthRange(1, maxTimeZoneLength);
+        }
+
+        public RegexDateTimeMatcher compile(final String format) {
+            currentPattern = 0;
+            charCount = 0;
+
+            char lastChar = 0;
+
+            for (int i = 0; i < format.length(); i++) {
+                final char c = format.charAt(i);
+
+                if (c != lastChar) {
+                    endPattern();
+                } else {
+                    charCount++;
+                }
+
+                try {
+                    switch (c) {
+                        case '\'':
+                            i = copyText(format, i);
+                            break;
+                        default:
+                            if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
+                                if (c != lastChar) {
+                                    beginPattern(c);
+                                }
+
+                                continue;
+                            }
+                            appendChar(c);
+                            break;
+                    }
+                } finally {
+                    lastChar = c;
+                }
+            }
+
+            endPattern();
+
+            final StringBuilder sb = new StringBuilder();
+            for (final String pattern : patterns) {
+                sb.append(pattern);
+            }
+
+            final String regex = sb.toString();
+            final Pattern pattern = Pattern.compile(regex);
+            return new RegexDateTimeMatcher(pattern, patterns, range.getMinLength(), range.getMaxLength());
+        }
+
+
+        private static LengthRange lengthRange(final String[] values) {
+            return new LengthRange(minLength(values), maxLength(values));
+        }
+
+        private static int minLength(final String[] values) {
+            if (values.length == 0) {
+                return 0;
+            }
+
+            int minLength = values[0].length();
+            for (final String value : values) {
+                minLength = Math.min(minLength, value.length());
+            }
+
+            return minLength;
+        }
+
+        private static int maxLength(final String[] values) {
+            if (values.length == 0) {
+                return 0;
+            }
+
+            int maxLength = values[0].length();
+            for (final String value : values) {
+                maxLength = Math.max(maxLength, value.length());
+            }
+
+            return maxLength;
+        }
+
+        private static String joinRegex(final String[] values) {
+            final StringBuilder sb = new StringBuilder("(?:");
+
+            for (final String value : values) {
+                sb.append(Pattern.quote(value)).append("|");
+            }
+
+            sb.deleteCharAt(sb.length() - 1);
+            sb.append(")");
+            return sb.toString();
+        }
+
+        private int copyText(final String formatString, final int startChar) {
+            boolean lastCharQuote = false;
+
+            final StringBuilder textBuilder = new StringBuilder();
+
+            try {
+                for (int i = startChar + 1; i < formatString.length(); i++) {
+                    final char c = formatString.charAt(i);
+                    if (c == '\'') {
+                        // We found a quote char. If the last character is also a quote, then it was an escape character. Copy a single quote, set lastCharQuote = false because we've finished
+                        // the escape sequence, and then continue to the next character.
+                        if (lastCharQuote) {
+                            textBuilder.append("'");
+                            lastCharQuote = false;
+                            continue;
+                        }
+
+                        // We found a quote character. The last character is not a quote. This character may or may not be an escape character, so we have to move on to the next character to find out.
+                        lastCharQuote = true;
+                        continue;
+                    } else if (lastCharQuote) {
+                        // The current character is not a quote character but the last character was. This means that the last character was ending the quotation.
+                        return i - 1;
+                    }
+
+                    textBuilder.append(c);
+                    lastCharQuote = false;
+                }
+
+                return formatString.length();
+            } finally {
+                if (textBuilder.length() == 0) {
+                    patterns.add("'");
+                } else {
+                    final String text = textBuilder.toString();
+                    if (text.length() > 0) {
+                        patterns.add(Pattern.quote(textBuilder.toString()));
+                    }
+                }
+            }
+        }
+
+        private void beginPattern(final char c) {
+            this.patternStarted = true;
+            this.charCount = 1;
+            this.currentPattern = c;
+        }
+
+        private void appendChar(final char c) {
+            patterns.add(Pattern.quote(String.valueOf(c)));
+            range = range.plus(1, 1);
+        }
+
+        private void endPattern() {
+            if (!patternStarted) {
+                return;
+            }
+
+            patternStarted = false;
+            switch (currentPattern) {
+                case 'G':
+                    addEraDesignator();
+                    break;
+                case 'y':
+                case 'Y':
+                    if (this.charCount == 2) {
+                        addYear(2);
+                    } else {
+                        addYear(this.charCount);
+                    }
+                    break;
+                case 'M':
+                    if (this.charCount <= 2) {
+                        addShortMonth();
+                    } else {
+                        addLongMonth();
+                    }
+                    break;
+                case 'w':
+                    addWeekInYear();
+                    break;
+                case 'W':
+                    addWeekInMonth();
+                    break;
+                case 'D':
+                    addDayInYear();
+                    break;
+                case 'd':
+                    addDayInMonth();
+                    break;
+                case 'F':
+                    addDayOfWeekInMonth();
+                    break;
+                case 'E':
+                    if (this.charCount <= 3) {
+                        addShortDayNameInWeek();
+                    } else {
+                        addLongDayNameInWeek();
+                    }
+                    break;
+                case 'u':
+                    addDayNumberInWeek();
+                    break;
+                case 'a':
+                    addAmPmMarker();
+                    break;
+                case 'H':
+                    addHourInDayBaseZero();
+                    break;
+                case 'k':
+                    addHourInDayBaseOne();
+                    break;
+                case 'K':
+                    add12HourBaseZero();
+                    break;
+                case 'h':
+                    add12HourBaseOne();
+                    break;
+                case 'm':
+                    addMinuteInHour();
+                    break;
+                case 's':
+                    addSecondInMinute();
+                    break;
+                case 'S':
+                    addMillisecond();
+                    break;
+                case 'z':
+                    addGeneralTimeZone();
+                    break;
+                case 'Z':
+                    addRFC822TimeZone();
+                    break;
+                case 'X':
+                    addISO8601TimeZone();
+                    break;
+            }
+        }
+
+        private void addEraDesignator() {
+            patterns.add(ERAS_PATTERN);
+            range = range.plus(ERAS_RANGE);
+        }
+
+        private void addYear(final int maxDigits) {
+            patterns.add("(?:-?\\d{1," + maxDigits + "})");
+            range = range.plus(1, maxDigits);
+        }
+
+        private void addShortMonth() {
+            patterns.add("(?:0[1-9]|1[0-2])");
+            range = range.plus(1, 2);
+        }
+
+        private void addLongMonth() {
+            patterns.add("(?:" + MONTHS_PATTERN + ")");
+            range = range.plus(MONTH_NAME_RANGE);
+        }
+
+        private void addWeekInYear() {
+            patterns.add("\\d{1,2}");
+            range = range.plus(1, 4);
+        }
+
+        private void addWeekInMonth() {
+            patterns.add("[0-4]");
+            range = range.plus(1, 1);
+        }
+
+        private void addDayInYear() {
+            patterns.add("\\d{1,3}");
+            range = range.plus(1, 3);
+        }
+
+        private void addDayInMonth() {
+            patterns.add("[0-3]?[0-9]");
+            range = range.plus(1, 2);
+        }
+
+        private void addDayOfWeekInMonth() {
+            patterns.add("[0-7]");
+            range = range.plus(1, 1);
+        }
+
+        private void addShortDayNameInWeek() {
+            patterns.add(SHORT_WEEKDAY_PATTERN);
+            range = range.plus(SHORT_WEEKDAY_RANGE);
+        }
+
+        private void addLongDayNameInWeek() {
+            patterns.add(LONG_WEEKDAY_PATTERN);
+            range = range.plus(LONG_WEEKDAY_RANGE);
+        }
+
+        private void addDayNumberInWeek() {
+            patterns.add("[1-7]");
+            range = range.plus(1, 1);
+        }
+
+        private void addAmPmMarker() {
+            patterns.add(AMPM_PATTERN);
+            range = range.plus(AMPM_RANGE);
+        }
+
+        private void addHourInDayBaseZero() {
+            patterns.add("(?:[0-9]|[01][0-9]|2[0-3])");
+            range = range.plus(1, 2);
+        }
+
+        private void addHourInDayBaseOne() {
+            patterns.add("(?:[1-9]|0[1-9]|1[0-9]|2[0-4])");
+            range = range.plus(1, 2);
+        }
+
+        private void add12HourBaseZero() {
+            patterns.add("(?:0?[0-9]|1[01])");
+            range = range.plus(1, 2);
+        }
+
+        private void add12HourBaseOne() {
+            patterns.add("(?:[1-9]|0[1-9]|1[012])");
+            range = range.plus(1, 2);
+        }
+
+        private void addMinuteInHour() {
+            patterns.add("(?:[0-9]|[0-5][0-9])");
+            range = range.plus(1, 2);
+        }
+
+        private void addSecondInMinute() {
+            addMinuteInHour(); // Same pattern
+            range = range.plus(1, 2);
+        }
+
+        private void addMillisecond() {
+            patterns.add("\\d{1,3}");
+            range = range.plus(1, 3);
+        }
+
+        private void addGeneralTimeZone() {
+            final StringBuilder sb = new StringBuilder();
+
+            sb.append("(?:"); // begin non-capturing group
+            sb.append(getGMTOffsetTimeZone());
+            sb.append("|");
+            sb.append(getNamedTimeZone());
+            sb.append(")"); // end non-capturing group
+
+            patterns.add(sb.toString());
+            range = range.plus(ZONE_NAME_RANGE);
+        }
+
+        private String getGMTOffsetTimeZone() {
+            // From SimpleDateFormat JavaDocs, GMTOffsetTimeZone defined as: GMT Sign Hours : Minutes
+            // Sign defined as '-' or '+'
+            // Hours defined as 1 or 2 digits, Minutes defined as 1 or 2 digits
+            // Digit defined as number between 0-9
+            return "(?:GMT[-+]\\d{1,2}:\\d{2})";
+        }
+
+        private String getNamedTimeZone() {
+            return ZONE_NAME_PATTERN;
+        }
+
+        private void addRFC822TimeZone() {
+            patterns.add("(?:[-+]\\d{4})");
+            range = range.plus(5, 5);
+        }
+
+        private void addISO8601TimeZone() {
+            patterns.add("(?:Z|(?:[-+](?:\\d{2}|\\d{4}|\\d{2}\\:\\d{2})))");
+            range = range.plus(1, 6);
+        }
+
+
+        private static class LengthRange {
+            private final int min;
+            private final int max;
+
+            public LengthRange(final int min, final int max) {
+                this.min = min;
+                this.max = max;
+            }
+
+            public int getMinLength() {
+                return min;
+            }
+
+            public int getMaxLength() {
+                return max;
+            }
+
+            public LengthRange plus(final LengthRange other) {
+                return new LengthRange(getMinLength() + other.getMinLength(), getMaxLength() + other.getMaxLength());
+            }
+
+            public LengthRange plus(final int min, final int max) {
+                return new LengthRange(getMinLength() + min, getMaxLength() + max);
+            }
+
+        }
+    }
+}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/SimpleDateFormatMatcher.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/SimpleDateFormatMatcher.java
new file mode 100644 (file)
index 0000000..721d140
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.text;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+
+class SimpleDateFormatMatcher implements DateTimeMatcher {
+    private final DateFormat dateFormat;
+
+    public SimpleDateFormatMatcher(final String format) {
+        this.dateFormat = new SimpleDateFormat(format);
+    }
+
+    @Override
+    public boolean matches(final String text) {
+        try {
+            dateFormat.parse(text);
+            return true;
+        } catch (final Exception e) {
+            return false;
+        }
+    }
+}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/StartsWithDigitsDateTimeMatcher.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/StartsWithDigitsDateTimeMatcher.java
new file mode 100644 (file)
index 0000000..2ccbac1
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.text;
+
+/**
+ * A DateTimeMatcher that bases its decision purely on whether or not the first X number of characters in the input text are digits.
+ */
+public class StartsWithDigitsDateTimeMatcher implements DateTimeMatcher {
+    private final int expectedNumberOfDigits;
+
+    public StartsWithDigitsDateTimeMatcher(final int expectedNumberOfDigits) {
+        this.expectedNumberOfDigits = expectedNumberOfDigits;
+    }
+
+    @Override
+    public boolean matches(final String text) {
+        if (text.length() < expectedNumberOfDigits) {
+            return false;
+        }
+
+        for (int i=0; i < expectedNumberOfDigits; i++) {
+            if (!Character.isDigit(text.charAt(i))) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/text/TestRegexDateTimeMatcher.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/text/TestRegexDateTimeMatcher.java
new file mode 100644 (file)
index 0000000..4a5e168
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.text;
+
+import org.junit.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestRegexDateTimeMatcher {
+
+    @Test
+    public void testCommonFormatsExpectedToPass() {
+        final Map<String, String> exampleToPattern = new LinkedHashMap<>();
+
+        // Following examples are intended to test specific functions in the regex generation.
+        exampleToPattern.put("2018-12-12", "yyyy-MM-dd");
+        exampleToPattern.put("2018/12/12", "yyyy/MM/dd");
+        exampleToPattern.put("12/12/2018", "MM/dd/yyyy");
+        exampleToPattern.put("12/12/18", "MM/dd/yy");
+        exampleToPattern.put("1:40:55", "HH:mm:ss");
+        exampleToPattern.put("01:0:5", "HH:mm:ss");
+        exampleToPattern.put("12/12/2018 13:04:08 GMT-05:00", "MM/dd/yyyy HH:mm:ss z");
+        exampleToPattern.put("12/12/2018 13:04:08 -0500", "MM/dd/yyyy HH:mm:ss Z");
+        exampleToPattern.put("12/12/2018 13:04:08 EST", "MM/dd/yyyy HH:mm:ss zzzz");
+        exampleToPattern.put("12/12/2018 13:04:08 -05", "MM/dd/yyyy HH:mm:ss X");
+        exampleToPattern.put("0:08 PM", "K:mm a");
+        exampleToPattern.put("Dec 12, 2018", "MMM dd, yyyy");
+        exampleToPattern.put("12 Dec 2018", "dd MMM yyyy");
+        exampleToPattern.put("12 December 2018", "dd MMM yyyy");
+
+        // TODO: The following examples are taken from the SimpleDateFormat's JavaDoc. Ensure that this is not a licensing concern,
+        // since it is not being distributed.
+        exampleToPattern.put("2001.07.04 AD at 12:08:56 PDT", "yyyy.MM.dd G 'at' HH:mm:ss z");
+        exampleToPattern.put("Wed, Jul 4, '01", "EEE, MMM d, ''yy");
+        exampleToPattern.put("12:08 PM", "h:mm a");
+        exampleToPattern.put("12 o'clock PM, Pacific Daylight Time", "hh 'o''clock' a, zzzz");
+        exampleToPattern.put("0:08 PM, PDT", "K:mm a, z");
+        exampleToPattern.put("02001.July.04 AD 12:08 PM", "yyyyy.MMMMM.dd GGG hh:mm aaa");
+        exampleToPattern.put("Wed, 4 Jul 2001 12:08:56 -0700", "EEE, d MMM yyyy HH:mm:ss Z");
+        exampleToPattern.put("010704120856-0700", "yyMMddHHmmssZ");
+        exampleToPattern.put("2001-07-04T12:08:56.235-0700", "yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+        exampleToPattern.put("2001-07-04T12:08:56.235-07:00", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+        exampleToPattern.put("2001-W27-3", "YYYY-'W'ww-u");
+
+        for (final Map.Entry<String, String> entry : exampleToPattern.entrySet()) {
+            final RegexDateTimeMatcher matcher = new RegexDateTimeMatcher.Compiler().compile(entry.getValue());
+            final boolean matches = matcher.matches(entry.getKey());
+
+            assertTrue("Pattern <" + entry.getValue() + "> did not match <" + entry.getKey() + ">", matches);
+        }
+    }
+}
index f48b919..78ff117 100644 (file)
  */
 package org.apache.nifi.util;
 
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.FlowFileHandlingException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.junit.Assert;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
@@ -43,22 +59,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import org.apache.nifi.controller.queue.QueueSize;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.FlowFileFilter;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.FlowFileAccessException;
-import org.apache.nifi.processor.exception.FlowFileHandlingException;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.provenance.ProvenanceReporter;
-import org.junit.Assert;
-
 public class MockProcessSession implements ProcessSession {
 
     private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>();
@@ -607,6 +607,16 @@ public class MockProcessSession implements ProcessSession {
             }
 
             @Override
+            public void mark(final int readlimit) {
+                bais.mark(readlimit);
+            }
+
+            @Override
+            public void reset() {
+                bais.reset();
+            }
+
+            @Override
             public String toString() {
                 return "ErrorHandlingInputStream[flowFile=" + mock + "]";
             }
index 2e2c8f7..043f7ab 100755 (executable)
@@ -119,7 +119,17 @@ public class AvroTypeUtil {
 
     private static Field buildAvroField(final RecordField recordField) {
         final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName(), recordField.isNullable());
-        final Field field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue());
+
+        final Field field;
+        final String recordFieldName = recordField.getFieldName();
+        if (isValidAvroFieldName(recordFieldName)) {
+            field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue());
+        } else {
+            final String validName = createValidAvroFieldName(recordField.getFieldName());
+            field = new Field(validName, schema, null, recordField.getDefaultValue());
+            field.addAlias(recordField.getFieldName());
+        }
+
         for (final String alias : recordField.getAliases()) {
             field.addAlias(alias);
         }
@@ -127,6 +137,56 @@ public class AvroTypeUtil {
         return field;
     }
 
+    private static boolean isValidAvroFieldName(final String fieldName) {
+        // Avro field names must match the following criteria:
+        // 1. Must be non-empty
+        // 2. Must begin with a letter or an underscore
+        // 3. Must consist only of letters, underscores, and numbers.
+        if (fieldName.isEmpty()) {
+            return false;
+        }
+
+        final char firstChar = fieldName.charAt(0);
+        if (firstChar != '_' && !Character.isLetter(firstChar)) {
+            return false;
+        }
+
+        for (int i=1; i < fieldName.length(); i++) {
+            final char c = fieldName.charAt(i);
+            if (c != '_' && !Character.isLetterOrDigit(c)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private static String createValidAvroFieldName(final String fieldName) {
+        if (fieldName.isEmpty()) {
+            return "UNNAMED_FIELD";
+        }
+
+        final StringBuilder sb = new StringBuilder();
+
+        final char firstChar = fieldName.charAt(0);
+        if (firstChar == '_' || Character.isLetter(firstChar)) {
+            sb.append(firstChar);
+        } else {
+            sb.append("_");
+        }
+
+        for (int i=1; i < fieldName.length(); i++) {
+            final char c = fieldName.charAt(i);
+            if (c == '_' || Character.isLetterOrDigit(c)) {
+                sb.append(c);
+            } else {
+                sb.append("_");
+            }
+        }
+
+        return sb.toString();
+    }
+
     private static Schema buildAvroSchema(final DataType dataType, final String fieldName, final boolean nullable) {
         final Schema schema;
 
@@ -462,7 +522,7 @@ public class AvroTypeUtil {
         Field field = avroSchema.getField(fieldName);
         if (field == null) {
             // No straight mapping was found, so check the aliases to see if it can be mapped
-            for(final String alias: recordField.getAliases()) {
+            for (final String alias : recordField.getAliases()) {
                 field = avroSchema.getField(alias);
                 if (field != null) {
                     fieldName = alias;
@@ -471,6 +531,28 @@ public class AvroTypeUtil {
             }
         }
 
+        if (field == null) {
+            for (final Field childField : avroSchema.getFields()) {
+                final Set<String> aliases = childField.aliases();
+                if (aliases.isEmpty()) {
+                    continue;
+                }
+
+                if (aliases.contains(fieldName)) {
+                    field = childField;
+                    break;
+                }
+
+                for (final String alias : recordField.getAliases()) {
+                    if (aliases.contains(alias)) {
+                        field = childField;
+                        fieldName = alias;
+                        break;
+                    }
+                }
+            }
+        }
+
         return new ImmutablePair<>(fieldName, field);
     }
 
@@ -493,7 +575,7 @@ public class AvroTypeUtil {
             }
 
             final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset);
-            rec.put(fieldName, converted);
+            rec.put(field.name(), converted);
         }
 
         // see if the Avro schema has any fields that aren't in the RecordSchema, and if those fields have a default
index b3819cf..73d4185 100644 (file)
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.schema.access.AvroSchemaTextStrategy;
 import org.apache.nifi.schema.access.InferenceSchemaStrategy;
@@ -60,7 +61,7 @@ public class JsonInferenceSchemaRegistryService extends SchemaRegistryService {
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
         if (strategy == null) {
             return null;
         }
@@ -102,7 +103,7 @@ public class JsonInferenceSchemaRegistryService extends SchemaRegistryService {
         if (schemaAccess.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue()) || schemaAccess.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
             return getSchema(variables, readSchema);
         } else {
-            return ((JsonSchemaAccessStrategy)schemaAccessStrategy).getSchema(variables, content, readSchema);
+            return ((JsonSchemaAccessStrategy) schemaAccessStrategy).getSchema(variables, content, readSchema);
         }
     }
 }
index 6923f48..1beb50e 100644 (file)
@@ -22,6 +22,7 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
@@ -43,17 +44,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
 
 public abstract class SchemaRegistryService extends AbstractControllerService {
 
@@ -151,15 +152,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
         return suppliedFields;
     }
 
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
-        if (strategy == null) {
-            return null;
-        }
-
-        return SchemaAccessUtils.getSchemaAccessStrategy(strategy, schemaRegistry, context);
-    }
-
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
         if (allowableValue == null) {
             return null;
         }
index 1f6c29b..004999f 100755 (executable)
 
 package org.apache.nifi.avro;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -41,11 +24,11 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.avro.generic.GenericData.Record;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
@@ -56,6 +39,25 @@ import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class TestAvroTypeUtil {
 
     @Test
@@ -450,4 +452,40 @@ public class TestAvroTypeUtil {
         assertTrue(o instanceof String);
         assertEquals("Hello", o);
     }
+
+    @Test
+    public void testAliasCreatedForInvalidField() {
+       final List<RecordField> fields = new ArrayList<>();
+       fields.add(new RecordField("valid", RecordFieldType.STRING.getDataType()));
+       fields.add(new RecordField("$invalid2", RecordFieldType.STRING.getDataType()));
+       fields.add(new RecordField("3invalid3", RecordFieldType.STRING.getDataType()));
+       fields.add(new RecordField("  __ Another ONE!!", RecordFieldType.STRING.getDataType()));
+
+       final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+       final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
+       assertNotNull(avroSchema.getField("valid"));
+
+       assertNull(avroSchema.getField("$invalid"));
+       final Field field2 = avroSchema.getField("_invalid2");
+       assertNotNull(field2);
+       assertEquals("_invalid2", field2.name());
+       assertEquals(1, field2.aliases().size());
+       assertTrue(field2.aliases().contains("$invalid2"));
+
+        assertNull(avroSchema.getField("$invalid3"));
+        final Field field3 = avroSchema.getField("_invalid3");
+        assertNotNull(field3);
+        assertEquals("_invalid3", field3.name());
+        assertEquals(1, field3.aliases().size());
+        assertTrue(field3.aliases().contains("3invalid3"));
+
+        assertNull(avroSchema.getField("  __ Another ONE!!"));
+        final Field field4 = avroSchema.getField("_____Another_ONE__");
+        assertNotNull(field4);
+        assertEquals("_____Another_ONE__", field4.name());
+        assertEquals(1, field4.aliases().size());
+        assertTrue(field4.aliases().contains("  __ Another ONE!!"));
+
+    }
 }
index 22f61b2..62e7231 100644 (file)
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.BufferedInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,6 +48,20 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.StopWatch;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * Base class for processors that write Records to HDFS.
  */
@@ -302,12 +301,11 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
                 final StopWatch stopWatch = new StopWatch(true);
 
                 // Read records from the incoming FlowFile and write them the tempFile
-                session.read(putFlowFile, (final InputStream rawIn) -> {
+                session.read(putFlowFile, (final InputStream in) -> {
                     RecordReader recordReader = null;
                     HDFSRecordWriter recordWriter = null;
 
-                    try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
-
+                    try {
                         // if we fail to create the RecordReader then we want to route to failure, so we need to
                         // handle this separately from the other IOExceptions which normally route to retry
                         try {
index 873297e..12a3f3e 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
@@ -58,7 +59,8 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader {
             throw new IllegalStateException("Cannot create RecordReader because already created");
         }
 
-        final InputStream in = new SSLSocketChannelInputStream(sslSocketChannel);
+        final InputStream socketIn = new SSLSocketChannelInputStream(sslSocketChannel);
+        final InputStream in = new BufferedInputStream(socketIn);
         recordReader = readerFactory.createRecordReader(flowFile, in, logger);
         return recordReader;
     }
index 216449c..071a052 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.io.ContentClaimInputStream;
 import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
 import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
 import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
@@ -144,7 +145,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     private long contentSizeIn = 0L, contentSizeOut = 0L;
 
     private ContentClaim currentReadClaim = null;
-    private ByteCountingInputStream currentReadClaimStream = null;
+    private ContentClaimInputStream currentReadClaimStream = null;
     private long processingStartTime;
 
     // List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed
@@ -2183,36 +2184,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 }
 
                 claimCache.flush(claim);
-                final InputStream rawInStream = context.getContentRepository().read(claim);
 
                 if (currentReadClaimStream != null) {
                     currentReadClaimStream.close();
                 }
 
                 currentReadClaim = claim;
-
-                currentReadClaimStream = new ByteCountingInputStream(rawInStream);
-                StreamUtils.skip(currentReadClaimStream, offset);
+                currentReadClaimStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset);
 
                 // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
                 // reuse the same InputStream for the next FlowFile
                 final InputStream disableOnClose = new DisableOnCloseInputStream(currentReadClaimStream);
-
                 return disableOnClose;
             } else {
                 claimCache.flush(claim);
-                final InputStream rawInStream = context.getContentRepository().read(claim);
-                try {
-                    StreamUtils.skip(rawInStream, offset);
-                } catch(IOException ioe) {
-                    try {
-                        rawInStream.close();
-                    } catch (final Exception e) {
-                        ioe.addSuppressed(ioe);
-                    }
 
-                    throw ioe;
-                }
+                final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset);
                 return rawInStream;
             }
         } catch (final ContentNotFoundException cnfe) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
new file mode 100644 (file)
index 0000000..94b9d2e
--- /dev/null
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.io;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An InputStream that is provided a Content Repository, Content Claim, and offset into the Content Claim where a FlowFile's
+ * content begins, and is capable of reading the content from the Content Repository, as well as providing the ability to use
+ * {@link #mark(int)}/{@link #reset()} in order to re-read content without buffering it.
+ */
+public class ContentClaimInputStream extends InputStream {
+    private final ContentRepository contentRepository;
+    private final ContentClaim contentClaim;
+    private final long claimOffset;
+
+    private InputStream delegate;
+    private long bytesConsumed;
+    private long currentOffset; // offset into the Content Claim; will differ from bytesRead if reset() is called after reading at least one byte or if claimOffset > 0
+    private long markOffset;
+
+    public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset) {
+        this.contentRepository = contentRepository;
+        this.contentClaim = contentClaim;
+        this.claimOffset = claimOffset;
+
+        this.currentOffset = claimOffset;
+    }
+
+    private InputStream getDelegate() throws IOException {
+        if (delegate == null) {
+            formDelegate();
+        }
+
+        return delegate;
+    }
+
+    public long getBytesConsumed() {
+        return bytesConsumed;
+    }
+
+    public long getCurrentOffset() {
+        return currentOffset;
+    }
+
+    @Override
+    public int read() throws IOException {
+        final int value = getDelegate().read();
+        if (value != -1) {
+            bytesConsumed++;
+            currentOffset++;
+        }
+
+        return value;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        final int count = getDelegate().read(b);
+        if (count != -1) {
+            bytesConsumed += count;
+            currentOffset += count;
+        }
+
+        return count;
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        final int count = getDelegate().read(b, off, len);
+        if (count != -1) {
+            bytesConsumed += count;
+            currentOffset += count;
+        }
+
+        return count;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long count = getDelegate().skip(n);
+        if (count > 0) {
+            bytesConsumed += count;
+            currentOffset += count;
+        }
+
+        return count;
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (delegate == null) {
+            return 0;
+        }
+
+        return delegate.available();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+
+    @Override
+    public void mark(final int readlimit) {
+        markOffset = currentOffset;
+    }
+
+    @Override
+    public void reset() throws IOException {
+        if (markOffset < 0) {
+            throw new IOException("Stream has not been marked");
+        }
+
+        if (currentOffset != markOffset) {
+            delegate.close();
+            formDelegate();
+            StreamUtils.skip(delegate, markOffset - claimOffset);
+            currentOffset = markOffset;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (delegate != null) {
+            delegate.close();
+        }
+    }
+
+    private void formDelegate() throws IOException {
+        if (delegate != null) {
+            delegate.close();
+        }
+
+        delegate = contentRepository.read(contentClaim);
+        StreamUtils.skip(delegate, claimOffset);
+        currentOffset = claimOffset;
+    }
+}
index 3c6a003..3ab7486 100644 (file)
  */
 package org.apache.nifi.controller.service;
 
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.attribute.expression.language.PreparedQuery;
 import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.util.FormatUtils;
 
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 public class StandardConfigurationContext implements ConfigurationContext {
 
     private final ComponentNode component;
@@ -74,7 +74,17 @@ public class StandardConfigurationContext implements ConfigurationContext {
     @Override
     public PropertyValue getProperty(final PropertyDescriptor property) {
         final String configuredValue = component.getProperty(property);
-        return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, serviceLookup, preparedQueries.get(property), variableRegistry);
+        final String resolvedValue = (configuredValue == null) ? property.getDefaultValue() : configuredValue;
+
+        if (resolvedValue == null) {
+            // We need to get the 'canonical representation' of the property descriptor from the component itself,
+            // since the supplied PropertyDescriptor may have been built using only the name, and without the proper
+            // default value.
+            final PropertyDescriptor resolvedDescriptor = component.getPropertyDescriptor(property.getName());
+            return new StandardPropertyValue(resolvedDescriptor.getDefaultValue(), serviceLookup, preparedQueries.get(property), variableRegistry);
+        }
+
+        return new StandardPropertyValue(resolvedValue, serviceLookup, preparedQueries.get(property), variableRegistry);
     }
 
     @Override
index 1880864..d21cc5c 100644 (file)
@@ -1223,6 +1223,7 @@ public class TestStandardProcessSession {
             session.read(ff1, new InputStreamCallback() {
                 @Override
                 public void process(InputStream in) throws IOException {
+                    in.read();
                 }
             });
             Assert.fail("Expected MissingFlowFileException");
@@ -1401,6 +1402,7 @@ public class TestStandardProcessSession {
             session.write(ff1, new StreamCallback() {
                 @Override
                 public void process(InputStream in, OutputStream out) throws IOException {
+                    in.read();
                 }
             });
             Assert.fail("Expected MissingFlowFileException");
@@ -1444,6 +1446,7 @@ public class TestStandardProcessSession {
             session.write(ff2, new StreamCallback() {
                 @Override
                 public void process(InputStream in, OutputStream out) throws IOException {
+                    in.read();
                 }
             });
             Assert.fail("Expected ContentNotFoundException");
@@ -1486,6 +1489,7 @@ public class TestStandardProcessSession {
             session.read(ff2, new InputStreamCallback() {
                 @Override
                 public void process(InputStream in) throws IOException {
+                    in.read();
                 }
             });
             Assert.fail("Expected MissingFlowFileException");
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java
new file mode 100644 (file)
index 0000000..4cba0b3
--- /dev/null
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.io;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class TestContentClaimInputStream {
+
+    private ContentRepository repo;
+    private ContentClaim contentClaim;
+    private AtomicBoolean closed = new AtomicBoolean();
+
+    @Before
+    public void setup() throws IOException {
+        repo = mock(ContentRepository.class);
+        contentClaim = mock(ContentClaim.class);
+
+        closed.set(false);
+        Mockito.when(repo.read(contentClaim)).thenAnswer(invocation -> new ByteArrayInputStream("hello".getBytes()) {
+            @Override
+            public void close() throws IOException {
+                super.close();
+                closed.set(true);
+            }
+        });
+    }
+
+
+    @Test
+    public void testStreamCreatedFromRepository() throws IOException {
+        final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L);
+
+        final byte[] buff = new byte[5];
+        StreamUtils.fillBuffer(in, buff);
+
+        Mockito.verify(repo, Mockito.times(1)).read(contentClaim);
+        Mockito.verifyNoMoreInteractions(repo);
+
+        final String contentRead = new String(buff);
+        assertEquals("hello", contentRead);
+
+        assertEquals(5, in.getBytesConsumed());
+        assertFalse(closed.get());
+
+        // Ensure that underlying stream is closed
+        in.close();
+        assertTrue(closed.get());
+    }
+
+
+    @Test
+    public void testThatContentIsSkipped() throws IOException {
+        final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L);
+
+        final byte[] buff = new byte[2];
+        StreamUtils.fillBuffer(in, buff);
+
+        Mockito.verify(repo, Mockito.times(1)).read(contentClaim);
+        Mockito.verifyNoMoreInteractions(repo);
+
+        final String contentRead = new String(buff);
+        assertEquals("lo", contentRead);
+
+        assertEquals(2, in.getBytesConsumed());
+        assertFalse(closed.get());
+
+        // Ensure that underlying stream is closed
+        in.close();
+        assertTrue(closed.get());
+    }
+
+
+    @Test
+    public void testRereadEntireClaim() throws IOException {
+        final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L);
+
+        final byte[] buff = new byte[5];
+
+        final int invocations = 10;
+        for (int i=0; i < invocations; i++) {
+            in.mark(5);
+
+            StreamUtils.fillBuffer(in, buff, true);
+
+            final String contentRead = new String(buff);
+            assertEquals("hello", contentRead);
+
+            assertEquals(5 * (i+1), in.getBytesConsumed());
+            assertEquals(5, in.getCurrentOffset());
+            assertEquals(-1, in.read());
+
+            in.reset();
+        }
+
+        Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read
+        Mockito.verifyNoMoreInteractions(repo);
+
+        // Ensure that underlying stream is closed
+        in.close();
+        assertTrue(closed.get());
+    }
+
+
+    @Test
+    public void testMultipleResetCallsAfterMark() throws IOException {
+        final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L);
+
+        final byte[] buff = new byte[5];
+
+        final int invocations = 10;
+        in.mark(5);
+
+        for (int i=0; i < invocations; i++) {
+            StreamUtils.fillBuffer(in, buff, true);
+
+            final String contentRead = new String(buff);
+            assertEquals("hello", contentRead);
+
+            assertEquals(5 * (i+1), in.getBytesConsumed());
+            assertEquals(5, in.getCurrentOffset());
+            assertEquals(-1, in.read());
+
+            in.reset();
+        }
+
+        Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read
+        Mockito.verifyNoMoreInteractions(repo);
+
+        // Ensure that underlying stream is closed
+        in.close();
+        assertTrue(closed.get());
+    }
+
+
+    @Test
+    public void testRereadWithOffset() throws IOException {
+        final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L);
+
+        final byte[] buff = new byte[2];
+
+        final int invocations = 10;
+        for (int i=0; i < invocations; i++) {
+            in.mark(5);
+
+            StreamUtils.fillBuffer(in, buff, true);
+
+            final String contentRead = new String(buff);
+            assertEquals("lo", contentRead);
+
+            assertEquals(2 * (i+1), in.getBytesConsumed());
+            assertEquals(5, in.getCurrentOffset());
+            assertEquals(-1, in.read());
+
+            in.reset();
+        }
+
+        Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read
+        Mockito.verifyNoMoreInteractions(repo);
+
+        // Ensure that underlying stream is closed
+        in.close();
+        assertTrue(closed.get());
+    }
+}
index affbe11..5f3445a 100644 (file)
@@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.streaming.ConnectionError;
+import org.apache.hive.streaming.HiveRecordWriter;
 import org.apache.hive.streaming.HiveStreamingConnection;
 import org.apache.hive.streaming.InvalidTable;
 import org.apache.hive.streaming.SerializationError;
@@ -59,11 +60,9 @@ import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.util.hive.AuthenticationFailedException;
 import org.apache.nifi.util.hive.HiveConfigurator;
 import org.apache.nifi.util.hive.HiveOptions;
-import org.apache.hive.streaming.HiveRecordWriter;
 import org.apache.nifi.util.hive.HiveUtils;
 import org.apache.nifi.util.hive.ValidationResources;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -396,11 +395,10 @@ public class PutHive3Streaming extends AbstractProcessor {
 
         StreamingConnection hiveStreamingConnection = null;
 
-        try (final InputStream rawIn = session.read(flowFile)) {
+        try (final InputStream in = session.read(flowFile)) {
             final RecordReader reader;
 
-            try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
-
+            try {
                 // if we fail to create the RecordReader then we want to route to failure, so we need to
                 // handle this separately from the other IOExceptions which normally route to retry
                 try {
@@ -415,7 +413,7 @@ public class PutHive3Streaming extends AbstractProcessor {
                 hiveStreamingConnection.beginTransaction();
                 hiveStreamingConnection.write(in);
                 hiveStreamingConnection.commitTransaction();
-                rawIn.close();
+                in.close();
 
                 Map<String, String> updateAttributes = new HashMap<>();
                 updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
index 6ce4161..2e1ef71 100644 (file)
@@ -69,8 +69,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -296,36 +296,67 @@ public class JoltTransformRecord extends AbstractProcessor {
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
         final RecordSchema schema;
-        final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
         try (final InputStream in = session.read(original);
              final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
             schema = writerFactory.getSchema(original.getAttributes(), reader.getSchema());
-            Record record;
 
             FlowFile transformed = session.create(original);
             final Map<String, String> attributes = new HashMap<>();
             final WriteResult writeResult;
-            try (final OutputStream out = session.write(transformed);
-                 final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) {
 
-                final JoltTransform transform = getTransform(context, original);
-                writer.beginRecordSet();
-                while ((record = reader.nextRecord()) != null) {
-                    Map<String, Object> recordMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
-                    // JOLT expects arrays to be of type List where our Record code uses Object[].
-                    // Make another pass of the transformed objects to change Object[] to List.
-                    recordMap = (Map<String, Object>) normalizeJoltObjects(recordMap);
-                    Object transformedObject = transform(transform, recordMap);
-                    // JOLT expects arrays to be of type List where our Record code uses Object[].
-                    // Make another pass of the transformed objects to change List to Object[].
-                    Record r = DataTypeUtils.toRecord(normalizeRecordObjects(transformedObject), schema, "r");
-                    writer.write(r);
+            try {
+                // We want to transform the first record before creating the Record Writer. We do this because the Record will likely end up with a different structure
+                // and therefore a difference Schema after being transformed. As a result, we want to transform the Record and then provide the transformed schema to the
+                // Record Writer so that if the Record Writer chooses to inherit the Record Schema from the Record itself, it will inherit the transformed schema, not the
+                // schema determined by the Record Reader.
+                final Record firstRecord = reader.nextRecord();
+                if (firstRecord == null) {
+                    try (final OutputStream out = session.write(transformed);
+                         final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) {
+
+                        writer.beginRecordSet();
+                        writeResult = writer.finishRecordSet();
+
+                        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                        attributes.putAll(writeResult.getAttributes());
+                    }
+
+                    transformed = session.putAllAttributes(transformed, attributes);
+                    session.transfer(transformed, REL_SUCCESS);
+                    session.transfer(original, REL_ORIGINAL);
+                    logger.info("{} had no Records to transform", new Object[]{original});
+                    return;
                 }
-                writeResult = writer.finishRecordSet();
 
-                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
-                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-                attributes.putAll(writeResult.getAttributes());
+                final JoltTransform transform = getTransform(context, original);
+                final Record transformedFirstRecord = transform(firstRecord, transform);
+
+                final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), transformedFirstRecord.getSchema());
+
+                // TODO: Is it possible that two Records with the same input schema could have different schemas after transformation?
+                // If so, then we need to avoid this pattern of writing all Records from the input FlowFile to the same output FlowFile
+                // and instead use a Map<RecordSchema, RecordSetWriter>. This way, even if many different output schemas are possible,
+                // the output FlowFiles will each only contain records that have the same schema.
+                try (final OutputStream out = session.write(transformed);
+                     final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
+
+                    writer.beginRecordSet();
+
+                    writer.write(transformedFirstRecord);
+
+                    Record record;
+                    while ((record = reader.nextRecord()) != null) {
+                        final Record transformedRecord = transform(record, transform);
+                        writer.write(transformedRecord);
+                    }
+
+                    writeResult = writer.finishRecordSet();
+
+                    attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                    attributes.putAll(writeResult.getAttributes());
+                }
             } catch (Exception e) {
                 logger.error("Unable to write transformed records {} due to {}", new Object[]{original, e.toString(), e});
                 session.remove(transformed);
@@ -339,15 +370,27 @@ public class JoltTransformRecord extends AbstractProcessor {
             session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(original, REL_ORIGINAL);
             logger.debug("Transformed {}", new Object[]{original});
-
-
         } catch (final Exception ex) {
             logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString(), ex});
             session.transfer(original, REL_FAILURE);
-            return;
         }
     }
 
+    private Record transform(final Record record, final JoltTransform transform) {
+        Map<String, Object> recordMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+
+        // JOLT expects arrays to be of type List where our Record code uses Object[].
+        // Make another pass of the transformed objects to change Object[] to List.
+        recordMap = (Map<String, Object>) normalizeJoltObjects(recordMap);
+        final Object transformedObject = transform(transform, recordMap);
+
+        // JOLT expects arrays to be of type List where our Record code uses Object[].
+        // Make another pass of the transformed objects to change List to Object[].
+        final Object normalizedRecordValues = normalizeRecordObjects(transformedObject);
+        final Record updatedRecord = DataTypeUtils.toRecord(normalizedRecordValues, "r");
+        return updatedRecord;
+    }
+
     private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) {
         final Optional<String> specString;
         if (context.getProperty(JOLT_SPEC).isSet()) {
index 313781d..e7b43b5 100644 (file)
@@ -167,15 +167,19 @@ public class TestJoltTransformRecord {
     @Test
     public void testInvalidFlowFileContent() throws IOException {
         generateTestData(1, null);
+
         final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+
         runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
         runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
         runner.setProperty(writer, "Pretty Print JSON", "true");
-        runner.enableControllerService(writer);
-        final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
         runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+
+        runner.enableControllerService(writer);
         parser.failAfter(0);
         runner.enqueue("invalid json");
+
         runner.run();
         runner.assertAllFlowFilesTransferred(JoltTransformRecord.REL_FAILURE);
     }
@@ -494,19 +498,23 @@ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
     public void testJoltSpecEL() throws IOException {
         generateTestData(1, null);
         final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")));
+
         runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
         runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
         runner.setProperty(writer, "Pretty Print JSON", "true");
         runner.enableControllerService(writer);
-        final String spec = "${joltSpec}";
-        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, "${joltSpec}");
         runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.DEFAULTR);
+
         final Map<String, String> attributes = Collections.singletonMap("joltSpec",
                 "{\"RatingRange\":5,\"rating\":{\"*\":{\"MaxLabel\":\"High\",\"MinLabel\":\"Low\",\"DisplayType\":\"NORMAL\"}}}");
         runner.enqueue(new byte[0], attributes);
+
         runner.run();
         runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
-runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+
         final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
         assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))),
                 new String(transformed.toByteArray()));
index e670c7d..6d6af61 100644 (file)
 
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -62,6 +49,18 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"})
 @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. "
     + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
@@ -329,8 +328,8 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
                 try {
                     session.read(flowFile, new InputStreamCallback() {
                         @Override
-                        public void process(final InputStream rawIn) throws IOException {
-                            try (final InputStream in = new BufferedInputStream(rawIn)) {
+                        public void process(final InputStream in) throws IOException {
+                            try {
                                 final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger());
                                 final RecordSet recordSet = reader.createRecordSet();
 
index 8bbec17..0815518 100644 (file)
 
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -65,6 +49,21 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.11.x"})
 @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.11.x Producer API. "
     + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
@@ -404,8 +403,8 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
                 try {
                     session.read(flowFile, new InputStreamCallback() {
                         @Override
-                        public void process(final InputStream rawIn) throws IOException {
-                            try (final InputStream in = new BufferedInputStream(rawIn)) {
+                        public void process(final InputStream in) throws IOException {
+                            try {
                                 final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
                                 final RecordSet recordSet = reader.createRecordSet();
 
index 82a3918..bb0478c 100644 (file)
 
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -65,6 +49,21 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"})
 @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 1.0 Producer API. "
     + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
@@ -406,8 +405,8 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
                 try {
                     session.read(flowFile, new InputStreamCallback() {
                         @Override
-                        public void process(final InputStream rawIn) throws IOException {
-                            try (final InputStream in = new BufferedInputStream(rawIn)) {
+                        public void process(final InputStream in) throws IOException {
+                            try {
                                 final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
                                 final RecordSet recordSet = reader.createRecordSet();
 
index 88a6b95..2dde669 100644 (file)
@@ -49,7 +49,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
@@ -404,8 +403,8 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
                 try {
                     session.read(flowFile, new InputStreamCallback() {
                         @Override
-                        public void process(final InputStream rawIn) throws IOException {
-                            try (final InputStream in = new BufferedInputStream(rawIn)) {
+                        public void process(final InputStream in) throws IOException {
+                            try {
                                 final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
                                 final RecordSet recordSet = reader.createRecordSet();
 
index 9dbe8c1..e4371f6 100644 (file)
@@ -125,13 +125,36 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
 
                     try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
 
-                        final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        // Get the first record and process it before we create the Record Writer. We do this so that if the Processor
+                        // updates the Record's schema, we can provide an updated schema to the Record Writer. If there are no records,
+                        // then we can simply create the Writer with the Reader's schema and begin & end the Record Set.
+                        Record firstRecord = reader.nextRecord();
+                        if (firstRecord == null) {
+                            final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
+                            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
+                                writer.beginRecordSet();
+
+                                final WriteResult writeResult = writer.finishRecordSet();
+                                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                                attributes.putAll(writeResult.getAttributes());
+                                recordCount.set(writeResult.getRecordCount());
+                            }
+
+                            return;
+                        }
+
+                        firstRecord = AbstractRecordProcessor.this.process(firstRecord, original, context);
+
+                        final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
                         try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
                             writer.beginRecordSet();
 
+                            writer.write(firstRecord);
+
                             Record record;
                             while ((record = reader.nextRecord()) != null) {
-                                final Record processed = AbstractRecordProcessor.this.process(record, writeSchema, original, context);
+                                final Record processed = AbstractRecordProcessor.this.process(record, original, context);
                                 writer.write(processed);
                             }
 
@@ -166,5 +189,5 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
         getLogger().info("Successfully converted {} records for {}", new Object[] {count, flowFile});
     }
 
-    protected abstract Record process(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context);
+    protected abstract Record process(Record record, FlowFile flowFile, ProcessContext context);
 }
index 374ed48..9d96d34 100644 (file)
 
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -48,6 +37,17 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.Tuple;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
     static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
         .name("record-reader")
@@ -112,18 +112,29 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-
         final AtomicInteger numRecords = new AtomicInteger(0);
         final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
         final FlowFile original = flowFile;
         final Map<String, String> originalAttributes = original.getAttributes();
+
         try {
             session.read(flowFile, new InputStreamCallback() {
                 @Override
                 public void process(final InputStream in) throws IOException {
                     try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
 
-                        final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final Record firstRecord = reader.nextRecord();
+                        if (firstRecord == null) {
+                            getLogger().info("{} has no Records, so routing just the original FlowFile to 'original'", new Object[] {original});
+                            return;
+                        }
+
+                        final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
+
+                        final Set<Relationship> firstRecordRelationships = route(firstRecord, writeSchema, original, context, flowFileContext);
+                        for (final Relationship relationship : firstRecordRelationships) {
+                            writeRecord(firstRecord, relationship, writers, session, original, originalAttributes, writerFactory);
+                        }
 
                         Record record;
                         while ((record = reader.nextRecord()) != null) {
@@ -131,21 +142,7 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
                             numRecords.incrementAndGet();
 
                             for (final Relationship relationship : relationships) {
-                                final RecordSetWriter recordSetWriter;
-                                Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
-                                if (tuple == null) {
-                                    FlowFile outFlowFile = session.create(original);
-                                    final OutputStream out = session.write(outFlowFile);
-                                    recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out);
-                                    recordSetWriter.beginRecordSet();
-
-                                    tuple = new Tuple<>(outFlowFile, recordSetWriter);
-                                    writers.put(relationship, tuple);
-                                } else {
-                                    recordSetWriter = tuple.getValue();
-                                }
-
-                                recordSetWriter.write(record);
+                                writeRecord(record, relationship, writers, session, original, originalAttributes, writerFactory);
                             }
                         }
                     } catch (final SchemaNotFoundException | MalformedRecordException e) {
@@ -216,6 +213,28 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
         getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records", new Object[] {flowFile, writers.size(), numRecords});
     }
 
+    private void writeRecord(final Record record, final Relationship relationship, final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers, final ProcessSession session,
+                             final FlowFile original, final Map<String, String> originalAttributes, final RecordSetWriterFactory writerFactory) throws IOException, SchemaNotFoundException {
+        final RecordSetWriter recordSetWriter;
+        Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
+
+        if (tuple == null) {
+            FlowFile outFlowFile = session.create(original);
+            final OutputStream out = session.write(outFlowFile);
+
+            final RecordSchema recordWriteSchema = writerFactory.getSchema(originalAttributes, record.getSchema());
+            recordSetWriter = writerFactory.createWriter(getLogger(), recordWriteSchema, out);
+            recordSetWriter.beginRecordSet();
+
+            tuple = new Tuple<>(outFlowFile, recordSetWriter);
+            writers.put(relationship, tuple);
+        } else {
+            recordSetWriter = tuple.getValue();
+        }
+
+        recordSetWriter.write(record);
+    }
+
     protected abstract Set<Relationship> route(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context, T flowFileContext);
 
     protected abstract boolean isRouteOriginal();
index a2d4e69..3b24d79 100644 (file)
@@ -30,7 +30,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -60,7 +59,7 @@ public class ConvertRecord extends AbstractRecordProcessor {
     }
 
     @Override
-    protected Record process(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) {
+    protected Record process(final Record record, final FlowFile flowFile, final ProcessContext context) {
         return record;
     }
 
index c687f74..b9686b2 100644 (file)
@@ -44,7 +44,10 @@ import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathResult;
 import org.apache.nifi.record.path.util.RecordPathCache;
 import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.Tuple;
@@ -157,7 +160,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
     private static final Set<Relationship> UNMATCHED_COLLECTION = Collections.singleton(REL_UNMATCHED);
     private static final Set<Relationship> SUCCESS_COLLECTION = Collections.singleton(REL_SUCCESS);
 
-    private volatile Set<Relationship> relationships = new HashSet<>(Arrays.asList(new Relationship[] {REL_SUCCESS, REL_FAILURE}));
+    private volatile Set<Relationship> relationships = new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
     private volatile boolean routeToMatchedUnmatched = false;
 
     @OnScheduled
@@ -197,8 +200,8 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
     @SuppressWarnings("unchecked")
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
         final Set<String> dynamicPropNames = validationContext.getProperties().keySet().stream()
-            .filter(prop -> prop.isDynamic())
-            .map(prop -> prop.getName())
+            .filter(PropertyDescriptor::isDynamic)
+            .map(PropertyDescriptor::getName)
             .collect(Collectors.toSet());
 
         if (dynamicPropNames.isEmpty()) {
@@ -305,8 +308,6 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         // Ensure that the Record has the appropriate schema to account for the newly added values
         final RecordPath resultPath = flowFileContext.getValue();
         if (resultPath != null) {
-            record.incorporateSchema(writeSchema);
-
             final Object lookupValue = lookupValueOption.get();
             final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record);
 
@@ -327,19 +328,25 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
 
                         for (final String fieldName : lookupRecord.getRawFieldNames()) {
                             final Object value = lookupRecord.getValue(fieldName);
-                            destinationRecord.setValue(fieldName, value);
+
+                            final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName);
+                            if (recordFieldOption.isPresent()) {
+                                destinationRecord.setValue(recordFieldOption.get(), value);
+                            } else {
+                                destinationRecord.setValue(fieldName, value);
+                            }
                         }
                     } else {
                         final Optional<Record> parentOption = fieldVal.getParentRecord();
-
-                        if (parentOption.isPresent()) {
-                            parentOption.get().setValue(fieldVal.getField().getFieldName(), lookupRecord);
-                        }
+                        parentOption.ifPresent(parent -> parent.setValue(fieldVal.getField(), lookupRecord));
                     }
                 });
             } else {
-                resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue));
+                final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
+                resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType));
             }
+
+            record.incorporateInactiveFields();
         }
 
         final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
index 7d9981b..c5273d8 100644 (file)
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
-
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
 import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteConnection;
@@ -58,7 +36,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -84,6 +61,30 @@ import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.Tuple;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 @EventDriven
 @SideEffectFree
@@ -141,10 +142,9 @@ public class QueryRecord extends AbstractProcessor {
     static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder()
         .name("cache-schema")
         .displayName("Cache Schema")
-        .description("Parsing the SQL query and deriving the FlowFile's schema is relatively expensive. If this value is set to true, "
-            + "the Processor will cache these values so that the Processor is much more efficient and much faster. However, if this is done, "
-            + "then the schema that is derived for the first FlowFile processed must apply to all FlowFiles. If all FlowFiles will not have the exact "
-            + "same schema, or if the SQL SELECT statement uses the Expression Language, this value should be set to false.")
+        .description("This property is no longer used. It remains solely for backward compatibility in order to avoid making existing Processors invalid upon upgrade. This property will be" +
+            " removed in future versions. Now, instead of forcing the user to understand the semantics of schema caching, the Processor caches up to 25 schemas and automatically rolls off the" +
+            " old schemas. This provides the same performance when caching was enabled previously and in some cases very significant performance improvements if caching was previously disabled.")
         .expressionLanguageSupported(ExpressionLanguageScope.NONE)
         .allowableValues("true", "false")
         .defaultValue("true")
@@ -165,7 +165,10 @@ public class QueryRecord extends AbstractProcessor {
     private List<PropertyDescriptor> properties;
     private final Set<Relationship> relationships = Collections.synchronizedSet(new HashSet<>());
 
-    private final Map<String, BlockingQueue<CachedStatement>> statementQueues = new HashMap<>();
+    private final Cache<Tuple<String, RecordSchema>, BlockingQueue<CachedStatement>> statementQueues = Caffeine.newBuilder()
+        .maximumSize(25)
+        .removalListener(this::onCacheEviction)
+        .build();
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -215,25 +218,6 @@ public class QueryRecord extends AbstractProcessor {
     }
 
     @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final boolean cache = validationContext.getProperty(CACHE_SCHEMA).asBoolean();
-        if (cache) {
-            for (final PropertyDescriptor descriptor : validationContext.getProperties().keySet()) {
-                if (descriptor.isDynamic() && validationContext.isExpressionLanguagePresent(validationContext.getProperty(descriptor).getValue())) {
-                    return Collections.singleton(new ValidationResult.Builder()
-                        .subject("Cache Schema")
-                        .input("true")
-                        .valid(false)
-                        .explanation("Cannot have 'Cache Schema' property set to true if any SQL statement makes use of the Expression Language")
-                        .build());
-                }
-            }
-        }
-
-        return Collections.emptyList();
-    }
-
-    @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
             .name(propertyDescriptorName)
@@ -246,6 +230,26 @@ public class QueryRecord extends AbstractProcessor {
             .build();
     }
 
+    @OnStopped
+    public synchronized void cleanup() {
+        for (final BlockingQueue<CachedStatement> statementQueue : statementQueues.asMap().values()) {
+            clearQueue(statementQueue);
+        }
+
+        statementQueues.invalidateAll();
+    }
+
+    private void onCacheEviction(final Tuple<String, RecordSchema> key, final BlockingQueue<CachedStatement> queue, final RemovalCause cause) {
+        clearQueue(queue);
+    }
+
+    private void clearQueue(final BlockingQueue<CachedStatement> statementQueue) {
+        CachedStatement stmt;
+        while ((stmt = statementQueue.poll()) != null) {
+            closeQuietly(stmt.getStatement(), stmt.getConnection());
+        }
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         final FlowFile original = session.get();
@@ -262,13 +266,14 @@ public class QueryRecord extends AbstractProcessor {
         final Set<FlowFile> createdFlowFiles = new HashSet<>();
 
         // Determine the Record Reader's schema
+        final RecordSchema writerSchema;
         final RecordSchema readerSchema;
         try (final InputStream rawIn = session.read(original)) {
             final Map<String, String> originalAttributes = original.getAttributes();
             final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger());
-            final RecordSchema inputSchema = reader.getSchema();
+            readerSchema = reader.getSchema();
 
-            readerSchema = recordSetWriterFactory.getSchema(originalAttributes, inputSchema);
+            writerSchema = recordSetWriterFactory.getSchema(originalAttributes, readerSchema);
         } catch (final Exception e) {
             getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
             session.transfer(original, REL_FAILURE);
@@ -296,12 +301,7 @@ public class QueryRecord extends AbstractProcessor {
                 try {
                     final String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue();
                     final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
-                    final QueryResult queryResult;
-                    if (context.getProperty(CACHE_SCHEMA).asBoolean()) {
-                        queryResult = queryWithCache(session, original, sql, context, recordReaderFactory);
-                    } else {
-                        queryResult = query(session, original, sql, context, recordReaderFactory);
-                    }
+                    final QueryResult queryResult = query(session, original, readerSchema, sql, recordReaderFactory);
 
                     final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
                     try {
@@ -313,7 +313,7 @@ public class QueryRecord extends AbstractProcessor {
                                 final RecordSchema writeSchema;
 
                                 try {
-                                    recordSet = new ResultSetRecordSet(rs, readerSchema);
+                                    recordSet = new ResultSetRecordSet(rs, writerSchema);
                                     final RecordSchema resultSetSchema = recordSet.getSchema();
                                     writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
                                 } catch (final SQLException | SchemaNotFoundException e) {
@@ -389,82 +389,59 @@ public class QueryRecord extends AbstractProcessor {
     }
 
 
-    private synchronized CachedStatement getStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
-        final FlowFile flowFile, final RecordReaderFactory recordReaderFactory) throws SQLException {
-
-        final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql);
-        if (statementQueue == null) {
-            return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
-        }
+    private synchronized CachedStatement getStatement(final String sql, final RecordSchema schema, final Supplier<CachedStatement> statementBuilder) {
+        final Tuple<String, RecordSchema> tuple = new Tuple<>(sql, schema);
+        final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(tuple, key -> new LinkedBlockingQueue<>());
 
         final CachedStatement cachedStmt = statementQueue.poll();
         if (cachedStmt != null) {
             return cachedStmt;
         }
 
-        return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory);
+        return statementBuilder.get();
     }
 
-    private CachedStatement buildCachedStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
-        final FlowFile flowFile, final RecordReaderFactory recordReaderFactory) throws SQLException {
+    private CachedStatement buildCachedStatement(final String sql, final ProcessSession session,  final FlowFile flowFile, final RecordSchema schema,
+                                                 final RecordReaderFactory recordReaderFactory) {
 
-        final CalciteConnection connection = connectionSupplier.get();
+        final CalciteConnection connection = createConnection();
         final SchemaPlus rootSchema = connection.getRootSchema();
 
-        final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordReaderFactory, getLogger());
+        final FlowFileTable flowFileTable = new FlowFileTable(session, flowFile, schema, recordReaderFactory, getLogger());
         rootSchema.add("FLOWFILE", flowFileTable);
         rootSchema.setCacheEnabled(false);
 
-        final PreparedStatement stmt = connection.prepareStatement(sql);
-        return new CachedStatement(stmt, flowFileTable, connection);
-    }
-
-    @OnStopped
-    public synchronized void cleanup() {
-        for (final BlockingQueue<CachedStatement> statementQueue : statementQueues.values()) {
-            CachedStatement stmt;
-            while ((stmt = statementQueue.poll()) != null) {
-                closeQuietly(stmt.getStatement(), stmt.getConnection());
-            }
+        try {
+            final PreparedStatement stmt = connection.prepareStatement(sql);
+            return new CachedStatement(stmt, flowFileTable, connection);
+        } catch (final SQLException e) {
+            throw new ProcessException(e);
         }
-
-        statementQueues.clear();
     }
 
-    @OnScheduled
-    public synchronized void setupQueues(final ProcessContext context) {
-        // Create a Queue of PreparedStatements for each property that is user-defined. This allows us to easily poll the
-        // queue and add as necessary, knowing that the queue already exists.
-        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
-            if (!descriptor.isDynamic()) {
-                continue;
-            }
 
-            final String sql = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
-            final BlockingQueue<CachedStatement> queue = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
-            statementQueues.put(sql, queue);
+    private CalciteConnection createConnection() {
+        final Properties properties = new Properties();
+        properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
+
+        try {
+            final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
+            final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
+            return calciteConnection;
+        } catch (final Exception e) {
+            throw new ProcessException(e);
         }
     }
 
-    protected QueryResult queryWithCache(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context,
-        final RecordReaderFactory recordParserFactory) throws SQLException {
 
-        final Supplier<CalciteConnection> connectionSupplier = () -> {
-            final Properties properties = new Properties();
-            properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
+    protected QueryResult query(final ProcessSession session, final FlowFile flowFile, final RecordSchema schema, final String sql, final RecordReaderFactory recordReaderFactory)
+                throws SQLException {
 
-            try {
-                final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
-                final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-                return calciteConnection;
-            } catch (final Exception e) {
-                throw new ProcessException(e);
-            }
-        };
+        final Supplier<CachedStatement> statementBuilder = () -> buildCachedStatement(sql, session, flowFile, schema, recordReaderFactory);
 
-        final CachedStatement cachedStatement = getStatement(sql, connectionSupplier, session, flowFile, recordParserFactory);
+        final CachedStatement cachedStatement = getStatement(sql, schema, statementBuilder);
         final PreparedStatement stmt = cachedStatement.getStatement();
-        final FlowFileTable<?, ?> table = cachedStatement.getTable();
+        final FlowFileTable table = cachedStatement.getTable();
         table.setFlowFile(session, flowFile);
 
         final ResultSet rs;
@@ -480,7 +457,7 @@ public class QueryRecord extends AbstractProcessor {
             public void close() throws IOException {
                 table.close();
 
-                final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql);
+                final BlockingQueue<CachedStatement> statementQueue = statementQueues.getIfPresent(new Tuple<>(sql, schema));
                 if (statementQueue == null || !statementQueue.offer(cachedStatement)) {
                     try {
                         cachedStatement.getConnection().close();
@@ -503,58 +480,6 @@ public class QueryRecord extends AbstractProcessor {
         };
     }
 
-    protected QueryResult query(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context,
-        final RecordReaderFactory recordParserFactory) throws SQLException {
-
-        final Properties properties = new Properties();
-        properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
-
-        Connection connection = null;
-        ResultSet resultSet = null;
-        Statement statement = null;
-        try {
-            connection = DriverManager.getConnection("jdbc:calcite:", properties);
-            final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-            final SchemaPlus rootSchema = calciteConnection.getRootSchema();
-
-            final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordParserFactory, getLogger());
-            rootSchema.add("FLOWFILE", flowFileTable);
-            rootSchema.setCacheEnabled(false);
-
-            statement = connection.createStatement();
-
-            try {
-                resultSet = statement.executeQuery(sql);
-            } catch (final Throwable t) {
-                flowFileTable.close();
-                throw t;
-            }
-
-            final ResultSet rs = resultSet;
-            final Statement stmt = statement;
-            final Connection conn = connection;
-
-            return new QueryResult() {
-                @Override
-                public void close() throws IOException {
-                    closeQuietly(rs, stmt, conn);
-                }
-
-                @Override
-                public ResultSet getResultSet() {
-                    return rs;
-                }
-
-                @Override
-                public int getRecordsRead() {
-                    return flowFileTable.getRecordsRead();
-                }
-            };
-        } catch (final Exception e) {
-            closeQuietly(resultSet, statement, connection);
-            throw e;
-        }
-    }
 
     private void closeQuietly(final AutoCloseable... closeables) {
         if (closeables == null) {
@@ -611,24 +536,24 @@ public class QueryRecord extends AbstractProcessor {
         }
     }
 
-    private static interface QueryResult extends Closeable {
+    private interface QueryResult extends Closeable {
         ResultSet getResultSet();
 
         int getRecordsRead();
     }
 
     private static class CachedStatement {
-        private final FlowFileTable<?, ?> table;
+        private final FlowFileTable table;
         private final PreparedStatement statement;
         private final Connection connection;
 
-        public CachedStatement(final PreparedStatement statement, final FlowFileTable<?, ?> table, final Connection connection) {
+        public CachedStatement(final PreparedStatement statement, final FlowFileTable table, final Connection connection) {
             this.statement = statement;
             this.table = table;
             this.connection = connection;
         }
 
-        public FlowFileTable<?, ?> getTable() {
+        public FlowFileTable getTable() {
             return table;
         }
 
index 9b5d209..20e5dd1 100644 (file)
 
 package org.apache.nifi.processors.standard;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -52,9 +43,19 @@ import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 
 @EventDriven
 @SideEffectFree
@@ -113,8 +114,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final boolean containsDynamic = validationContext.getProperties().keySet().stream()
-            .anyMatch(property -> property.isDynamic());
+        final boolean containsDynamic = validationContext.getProperties().keySet().stream().anyMatch(PropertyDescriptor::isDynamic);
 
         if (containsDynamic) {
             return Collections.emptyList();
@@ -142,13 +142,9 @@ public class UpdateRecord extends AbstractRecordProcessor {
     }
 
     @Override
-    protected Record process(Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) {
+    protected Record process(Record record, final FlowFile flowFile, final ProcessContext context) {
         final boolean evaluateValueAsRecordPath = context.getProperty(REPLACEMENT_VALUE_STRATEGY).getValue().equals(RECORD_PATH_VALUES.getValue());
 
-        // Incorporate the RecordSchema that we will use for writing records into the Schema that we have
-        // for the record, because it's possible that the updates to the record will not be valid otherwise.
-        record.incorporateSchema(writeSchema);
-
         for (final String recordPathText : recordPaths) {
             final RecordPath recordPath = recordPathCache.getCompiled(recordPathText);
             final RecordPathResult result = recordPath.evaluate(record);
@@ -177,15 +173,17 @@ public class UpdateRecord extends AbstractRecordProcessor {
                         fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name());
 
                         final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue();
-                        fieldVal.updateValue(evaluatedReplacementVal);
+                        fieldVal.updateValue(evaluatedReplacementVal, RecordFieldType.STRING.getDataType());
                     });
                 } else {
                     final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile).getValue();
-                    result.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(evaluatedReplacementVal));
+                    result.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(evaluatedReplacementVal, RecordFieldType.STRING.getDataType()));
                 }
             }
         }
 
+        record.incorporateInactiveFields();
+
         return record;
     }
 
@@ -204,7 +202,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
             final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldVal);
             final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList());
             final Object replacementObject = getReplacementObject(selectedFields);
-            fieldVal.updateValue(replacementObject);
+            updateFieldValue(fieldVal, replacementObject);
 
             record = updateRecord(destinationFieldValues, selectedFields, record);
         }
@@ -222,29 +220,44 @@ public class UpdateRecord extends AbstractRecordProcessor {
                 return (Record) replacement;
             }
 
+            final FieldValue replacementFieldValue = (FieldValue) replacement;
+            if (replacementFieldValue.getValue() instanceof Record) {
+                return (Record) replacementFieldValue.getValue();
+            }
+
             final List<RecordField> fields = selectedFields.stream().map(FieldValue::getField).collect(Collectors.toList());
             final RecordSchema schema = new SimpleRecordSchema(fields);
             final Record mapRecord = new MapRecord(schema, new HashMap<>());
             for (final FieldValue selectedField : selectedFields) {
-                mapRecord.setValue(selectedField.getField().getFieldName(), selectedField.getValue());
+                mapRecord.setValue(selectedField.getField(), selectedField.getValue());
             }
 
             return mapRecord;
         } else {
             for (final FieldValue fieldVal : destinationFields) {
-                fieldVal.updateValue(getReplacementObject(selectedFields));
+                final Object replacementObject = getReplacementObject(selectedFields);
+                updateFieldValue(fieldVal, replacementObject);
             }
             return record;
         }
     }
 
+    private void updateFieldValue(final FieldValue fieldValue, final Object replacement) {
+        if (replacement instanceof FieldValue) {
+            final FieldValue replacementFieldValue = (FieldValue) replacement;
+            fieldValue.updateValue(replacementFieldValue.getValue(), replacementFieldValue.getField().getDataType());
+        } else {
+            fieldValue.updateValue(replacement);
+        }
+    }
+
     private Object getReplacementObject(final List<FieldValue> selectedFields) {
         if (selectedFields.size() > 1) {
             final List<RecordField> fields = selectedFields.stream().map(FieldValue::getField).collect(Collectors.toList());
             final RecordSchema schema = new SimpleRecordSchema(fields);
             final Record record = new MapRecord(schema, new HashMap<>());
             for (final FieldValue fieldVal : selectedFields) {
-                record.setValue(fieldVal.getField().getFieldName(), fieldVal.getValue());
+                record.setValue(fieldVal.getField(), fieldVal.getValue());
             }
 
             return record;
@@ -253,7 +266,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
         if (selectedFields.isEmpty()) {
             return null;
         } else {
-            return selectedFields.get(0).getValue();
+            return selectedFields.get(0);
         }
     }
 }
index 5f92311..06ffb76 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.nifi.serialization.record.Record;
 
 import java.io.InputStream;
 
-public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
+public class FlowFileEnumerator implements Enumerator<Object> {
     private final ProcessSession session;
     private final FlowFile flowFile;
     private final ComponentLog logger;
index 9e10377..8c0e2ce 100644 (file)
  */
 package org.apache.nifi.queryrecord;
 
-import java.io.InputStream;
-import java.lang.reflect.Type;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
@@ -46,18 +37,24 @@ import org.apache.calcite.util.Pair;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.lang.reflect.Type;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 
-public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable, TranslatableTable {
+public class FlowFileTable extends AbstractTable implements QueryableTable, TranslatableTable {
 
-    private final RecordReaderFactory recordParserFactory;
+    private final RecordReaderFactory recordReaderFactory;
     private final ComponentLog logger;
 
     private RecordSchema recordSchema;
@@ -67,15 +64,16 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
     private volatile FlowFile flowFile;
     private volatile int maxRecordsRead;
 
-    private final Set<FlowFileEnumerator<?>> enumerators = new HashSet<>();
+    private final Set<FlowFileEnumerator> enumerators = new HashSet<>();
 
     /**
      * Creates a FlowFile table.
      */
-    public FlowFileTable(final ProcessSession session, final FlowFile flowFile, final RecordReaderFactory recordParserFactory, final ComponentLog logger) {
+    public FlowFileTable(final ProcessSession session, final FlowFile flowFile, final RecordSchema schema, final RecordReaderFactory recordReaderFactory, final ComponentLog logger) {
         this.session = session;
         this.flowFile = flowFile;
-        this.recordParserFactory = recordParserFactory;
+        this.recordSchema = schema;
+        this.recordReaderFactory = recordReaderFactory;
         this.logger = logger;
     }
 
@@ -93,7 +91,7 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
 
     public void close() {
         synchronized (enumerators) {
-            for (final FlowFileEnumerator<?> enumerator : enumerators) {
+            for (final FlowFileEnumerator enumerator : enumerators) {
                 enumerator.close();
             }
         }
@@ -110,7 +108,7 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
             @Override
             @SuppressWarnings({"unchecked", "rawtypes"})
             public Enumerator<Object> enumerator() {
-                final FlowFileEnumerator flowFileEnumerator = new FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields) {
+                final FlowFileEnumerator flowFileEnumerator = new FlowFileEnumerator(session, flowFile, logger, recordReaderFactory, fields) {
                     @Override
                     protected void onFinish() {
                         final int recordCount = getRecordsRead();
@@ -175,30 +173,16 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
             return relDataType;
         }
 
-        RecordSchema schema;
-        try (final InputStream in = session.read(flowFile)) {
-            final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, logger);
-            schema = recordParser.getSchema();
-        } catch (final Exception e) {
-            throw new ProcessException("Failed to determine schema of data records for " + flowFile, e);
-        }
-
         final List<String> names = new ArrayList<>();
         final List<RelDataType> types = new ArrayList<>();
 
         final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
-        for (final RecordField field : schema.getFields()) {
+        for (final RecordField field : recordSchema.getFields()) {
             names.add(field.getFieldName());
             final RelDataType relDataType = getRelDataType(field.getDataType(), javaTypeFactory);
             types.add(javaTypeFactory.createTypeWithNullability(relDataType, field.isNullable()));
         }
 
-        logger.debug("Found Schema: {}", new Object[] {schema});
-
-        if (recordSchema == null) {
-            recordSchema = schema;
-        }
-
         relDataType = typeFactory.createStructType(Pair.zip(names, types));
         return relDataType;
     }
index afca202..b1b656f 100644 (file)
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.queryrecord;
 
-import java.util.List;
-
 import org.apache.calcite.adapter.enumerable.EnumerableConvention;
 import org.apache.calcite.adapter.enumerable.EnumerableRel;
 import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
@@ -37,6 +35,8 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 
+import java.util.List;
+
 /**
  * Relational expression representing a scan of a FlowFile.
  *
@@ -45,10 +45,10 @@ import org.apache.calcite.rel.type.RelDataTypeField;
  * </p>
  */
 public class FlowFileTableScan extends TableScan implements EnumerableRel {
-    final FlowFileTable<?, ?> flowFileTable;
+    final FlowFileTable flowFileTable;
     final int[] fields;
 
-    protected FlowFileTableScan(final RelOptCluster cluster, final RelOptTable table, final FlowFileTable<?, ?> flowFileTable, final int[] fields) {
+    protected FlowFileTableScan(final RelOptCluster cluster, final RelOptTable table, final FlowFileTable flowFileTable, final int[] fields) {
         super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
 
         this.flowFileTable = flowFileTable;
index 30b2b24..3efd9d1 100644 (file)
 
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.lookup.RecordLookupService;
@@ -48,6 +37,16 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
 public class TestLookupRecord {
 
     private TestRunner runner;
@@ -112,7 +111,7 @@ public class TestLookupRecord {
     }
 
     @Test
-    public void testAllMatch() throws InitializationException {
+    public void testAllMatch() {
         lookupService.addValue("John Doe", "Soccer");
         lookupService.addValue("Jane Doe", "Basketball");
         lookupService.addValue("Jimmy Doe", "Football");
@@ -129,7 +128,7 @@ public class TestLookupRecord {
     }
 
     @Test
-    public void testAllUnmatched() throws InitializationException {
+    public void testAllUnmatched() {
         runner.enqueue("");
         runner.run();
 
@@ -142,7 +141,7 @@ public class TestLookupRecord {
     }
 
     @Test
-    public void testMixtureOfMatch() throws InitializationException {
+    public void testMixtureOfMatch() {
         lookupService.addValue("John Doe", "Soccer");
         lookupService.addValue("Jimmy Doe", "Football");
 
@@ -166,7 +165,7 @@ public class TestLookupRecord {
 
 
     @Test
-    public void testResultPathNotFound() throws InitializationException {
+    public void testResultPathNotFound() {
         runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/other");
 
         lookupService.addValue("John Doe", "Soccer");
@@ -181,11 +180,11 @@ public class TestLookupRecord {
 
         out.assertAttributeEquals("record.count", "3");
         out.assertAttributeEquals("mime.type", "text/plain");
-        out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
+        out.assertContentEquals("John Doe,48,,Soccer\nJane Doe,47,,Basketball\nJimmy Doe,14,,Football\n");
     }
 
     @Test
-    public void testLookupPathNotFound() throws InitializationException {
+    public void testLookupPathNotFound() {
         runner.setProperty("lookup", "/other");
 
         runner.enqueue("");
@@ -200,7 +199,7 @@ public class TestLookupRecord {
     }
 
     @Test
-    public void testUnparseableData() throws InitializationException {
+    public void testUnparseableData() {
         recordReader.failAfter(1);
 
         runner.enqueue("");
@@ -213,7 +212,7 @@ public class TestLookupRecord {
     }
 
     @Test
-    public void testNoResultPath() throws InitializationException {
+    public void testNoResultPath() {
         lookupService.addValue("John Doe", "Soccer");
         lookupService.addValue("Jane Doe", "Basketball");
         lookupService.addValue("Jimmy Doe", "Football");
@@ -233,7 +232,7 @@ public class TestLookupRecord {
 
 
     @Test
-    public void testMultipleLookupPaths() throws InitializationException {
+    public void testMultipleLookupPaths() {
         lookupService.addValue("John Doe", "Soccer");
         lookupService.addValue("Jane Doe", "Basketball");
         lookupService.addValue("Jimmy Doe", "Football");
@@ -252,7 +251,7 @@ public class TestLookupRecord {
     }
 
     @Test
-    public void testInvalidUnlessAllRequiredPropertiesAdded() throws InitializationException {
+    public void testInvalidUnlessAllRequiredPropertiesAdded() {
         runner.removeProperty(new PropertyDescriptor.Builder().name("lookup").build());
         runner.setProperty("hello", "/name");
         runner.assertNotValid();
@@ -266,7 +265,7 @@ public class TestLookupRecord {
 
 
     @Test
-    public void testAddFieldsToExistingRecord() throws InitializationException, IOException {
+    public void testAddFieldsToExistingRecord() throws InitializationException {
         final RecordLookup lookupService = new RecordLookup();
         runner.addControllerService("lookup", lookupService);
         runner.enableControllerService(lookupService);
@@ -275,7 +274,7 @@ public class TestLookupRecord {
         fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+        final Record sports = new MapRecord(schema, new HashMap<>());
 
         sports.setValue("favorite", "basketball");
         sports.setValue("least", "soccer");
@@ -318,7 +317,7 @@ public class TestLookupRecord {
         fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+        final Record sports = new MapRecord(schema, new HashMap<>());
 
         sports.setValue("favorite", "basketball");
         sports.setValue("least", "soccer");
@@ -364,7 +363,7 @@ public class TestLookupRecord {
         fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+        final Record sports = new MapRecord(schema, new HashMap<>());
 
         sports.setValue("favorite", "basketball");
         sports.setValue("least", "soccer");
index 656ad5f..58387ef 100644 (file)
@@ -59,6 +59,7 @@ import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StringUtils;
 
 import javax.net.ssl.SSLContext;
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.Proxy;
@@ -298,8 +299,11 @@ public class RestLookupService extends AbstractControllerService implements Reco
                 return Optional.empty();
             }
 
-            InputStream is = responseBody.byteStream();
-            Record record = handleResponse(is, context);
+            final Record record;
+            try (final InputStream is = responseBody.byteStream();
+                final InputStream bufferedIn = new BufferedInputStream(is)) {
+                record = handleResponse(bufferedIn, context);
+            }
 
             return Optional.ofNullable(record);
         } catch (Exception e) {
index 6ff380d..cb42fed 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.nifi.serialization;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 
 import java.io.IOException;
@@ -38,8 +39,10 @@ public interface RecordReaderFactory extends ControllerService {
      * Create a RecordReader instance to read records from specified InputStream.
      * This method calls {@link #createRecordReader(Map, InputStream, ComponentLog)} with Attributes of the specified FlowFile.
      * @param flowFile Attributes of this FlowFile are used to resolve Record Schema via Expression Language dynamically. This can be null.
-     * @param in InputStream containing Records. This can be null or empty stream.
-     * @param logger A logger bind to a component
+     *
+     * @param in InputStream containing Records.
+     * @param logger A logger bound to a component
+     *
      * @return Created RecordReader instance
      */
     default RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
@@ -47,11 +50,26 @@ public interface RecordReaderFactory extends ControllerService {
     }
 
     /**
+     * <p>
      * Create a RecordReader instance to read records from specified InputStream.
-     * @param variables A map contains variables which is used to resolve Record Schema via Expression Language dynamically.
+     * <p>
+     *
+     * <p>
+     * Many Record Readers will need to read from the Input Stream in order to ascertain the appropriate Schema, and then
+     * re-read some of the data in order to read the Records. As a result, it is common for Readers to use
+     * {@link InputStream#mark(int) mark}/{@link InputStream#reset() reset}, so this should be considered when providing an
+     * InputStream. The {@link InputStream} that is provided by {@link org.apache.nifi.processor.ProcessSession#read(FlowFile) SessionProcess.read} /
+     * {@link org.apache.nifi.processor.ProcessSession#write(FlowFile, StreamCallback) ProcessSession.write} does provide the ability to use mark/reset
+     * and does so in a way that allows any number of bytes to be read before resetting without requiring that data be buffered. Therefore, it is recommended
+     * that when providing an InputStream from {@link org.apache.nifi.processor.ProcessSession ProcessSession} that the InputStream not be wrapped in a
+     * BufferedInputStream. However, if the stream is coming from elsewhere, it may be necessary.
+     * </p>
+     *
+     * @param variables A map containing variables which is used to resolve the Record Schema dynamically via Expression Language.
      *                 This can be null or empty.
-     * @param in InputStream containing Records. This can be null or empty stream.
-     * @param logger A logger bind to a component
+     * @param in InputStream containing Records.
+     * @param logger A logger bound to a component
+     *
      * @return Created RecordReader instance
      */
     RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSchemaCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSchemaCacheService.java
new file mode 100644 (file)
index 0000000..87925ae
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.Optional;
+
+public interface RecordSchemaCacheService extends ControllerService {
+    public static final String CACHE_IDENTIFIER_ATTRIBUTE = "schema.cache.identifier";
+
+    /**
+     * Updates the cache to include the given Record Schema and returns an identifier
+     * for the Schema. If the schema already exists in the cache, the existing identifier
+     * is returned. Otherwise, the schema is added to the cache and a new identifier is
+     * created and returned. This identifier can then be used to retrieve the Record Schema
+     * via the {@link #getSchema(String)} method
+     *
+     * @param schema the schema to cache
+     * @return a unique identifier for the schema
+     */
+    String cacheSchema(RecordSchema schema);
+
+    /**
+     * Returns the Schema with the given identifier, if it can be found in the cache.
+     * Note that the cache may choose to evict schemas for any number of reasons and, as such,
+     * the service may return an empty Optional even immediately after the Schema is cached
+     * via the {@link #cacheSchema(RecordSchema)}.
+     *
+     * @param schemaIdentifier the identifier of the schema
+     * @return an Optional holding the Record Schema with the given identifier, if it can be found,
+     * or an empty Optional if the schema cannot be found
+     */
+    Optional<RecordSchema> getSchema(String schemaIdentifier);
+
+}
index b16464e..d7c3d5b 100755 (executable)
@@ -1,13 +1,13 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
-        contributor license agreements. See the NOTICE file distributed with this 
-        work for additional information regarding copyright ownership. The ASF licenses 
-        this file to You under the Apache License, Version 2.0 (the "License"); you 
-        may not use this file except in compliance with the License. You may obtain 
-        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
-        required by applicable law or agreed to in writing, software distributed 
-        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
-        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+    <!-- Licensed to the Apache Software Foundation (ASF) under one or more
+        contributor license agreements. See the NOTICE file distributed with this
+        work for additional information regarding copyright ownership. The ASF licenses
+        this file to You under the Apache License, Version 2.0 (the "License"); you
+        may not use this file except in compliance with the License. You may obtain
+        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+        required by applicable law or agreed to in writing, software distributed
+        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+        OR CONDITIONS OF ANY KIND, either express or implied. See the License for
         the specific language governing permissions and limitations under the License. -->
     <modelVersion>4.0.0</modelVersion>
     <parent>
                         <exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
                         <exclude>src/test/resources/csv/single-bank-account.csv</exclude>
                         <exclude>src/test/resources/csv/multi-bank-account_escapedchar.csv</exclude>
+                        <exclude>src/test/resources/csv/prov-events.csv</exclude>
                         <exclude>src/test/resources/grok/error-with-stack-trace.log</exclude>
                         <exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude>
                         <exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
                         <exclude>src/test/resources/json/output/dataTypes.json</exclude>
                         <exclude>src/test/resources/json/elements-for-record-choice.json</exclude>
                         <exclude>src/test/resources/json/record-choice.avsc</exclude>
+                        <exclude>src/test/resources/json/prov-events.json</exclude>
+                        <exclude>src/test/resources/json/docs-example.json</exclude>
                         <exclude>src/test/resources/syslog/syslog5424/log.txt</exclude>
                         <exclude>src/test/resources/syslog/syslog5424/log_all.txt</exclude>
                         <exclude>src/test/resources/syslog/syslog5424/log_mix.txt</exclude>
index 434d73f..11379b9 100644 (file)
@@ -25,7 +25,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -85,7 +85,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(String strategy, SchemaRegistry schemaRegistry, ConfigurationContext context) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(String strategy, SchemaRegistry schemaRegistry, PropertyContext context) {
         if (EMBEDDED_AVRO_SCHEMA.getValue().equals(strategy)) {
             return new EmbeddedAvroSchemaAccessStrategy();
         } else {
@@ -94,15 +94,6 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ValidationContext context) {
-        if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) {
-            return new EmbeddedAvroSchemaAccessStrategy();
-        } else {
-            return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
-        }
-    }
-
-    @Override
     public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
         final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue();
         if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
index 9c31cca..624981b 100644 (file)
 
 package org.apache.nifi.csv;
 
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.io.input.BOMInputStream;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -39,19 +29,24 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
     private static final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
 
-    private final ConfigurationContext context;
+    private final PropertyContext context;
 
-    public CSVHeaderSchemaStrategy(final ConfigurationContext context) {
+    public CSVHeaderSchemaStrategy(final PropertyContext context) {
         this.context = context;
     }
 
-    public CSVHeaderSchemaStrategy(final ValidationContext context) {
-        this.context = null;
-    }
-
     @Override
     public RecordSchema getSchema(Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
         if (this.context == null) {
index 858ef70..af46147 100644 (file)
 
 package org.apache.nifi.csv;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVRecord;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
+import org.apache.nifi.schema.inference.RecordSourceFactory;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.schema.inference.TimeValueInference;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.serialization.RecordReader;
@@ -45,6 +44,12 @@ import org.apache.nifi.serialization.SchemaRegistryService;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.stream.io.NonCloseableInputStream;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 @Tags({"csv", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"})
 @CapabilityDescription("Parses CSV-formatted data, returning each row in the CSV file as a separate record. "
     + "This reader assumes that the first line in the content is the column names and all subsequent lines are "
@@ -117,7 +122,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         // Ensure that if we are deriving schema from header that we always treat the first line as a header,
         // regardless of the 'First Line is Header' property
         final String accessStrategy = context.getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
-        if (headerDerivedAllowableValue.getValue().equals(accessStrategy)) {
+        if (headerDerivedAllowableValue.getValue().equals(accessStrategy) || SchemaInferenceUtil.INFER_SCHEMA.getValue().equals(accessStrategy)) {
             this.csvFormat = this.csvFormat.withFirstRecordAsHeader();
             this.firstLineIsHeader = true;
         }
@@ -126,33 +131,27 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
     @Override
     public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
         // Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header.
-        final BufferedInputStream bufferedIn = new BufferedInputStream(in);
-        bufferedIn.mark(1024 * 1024);
-        final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null);
-        bufferedIn.reset();
+        in.mark(1024 * 1024);
+        final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(in), null);
+        in.reset();
 
         if(APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
-            return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+            return new CSVRecordReader(in, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
         } else if(JACKSON_CSV.getValue().equals(csvParser)) {
-            return new JacksonCSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+            return new JacksonCSVRecordReader(in, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
         } else {
             throw new IOException("Parser not supported");
         }
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
-        if (strategy.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
-            return new CSVHeaderSchemaStrategy(context);
-        }
-
-        return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
-    }
-
-    @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
         if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
             return new CSVHeaderSchemaStrategy(context);
+        } else if (allowableValue.equalsIgnoreCase(SchemaInferenceUtil.INFER_SCHEMA.getValue())) {
+            final RecordSourceFactory<CSVRecord> sourceFactory = (var, in) -> new CSVRecordSource(in, context);
+            final SchemaInferenceEngine<CSVRecord> inference = new CSVSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
+            return new InferSchemaAccessStrategy<>(sourceFactory, inference, getLogger());
         }
 
         return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
@@ -162,11 +161,12 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
     protected List<AllowableValue> getSchemaAccessStrategyValues() {
         final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
         allowableValues.add(headerDerivedAllowableValue);
+        allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA);
         return allowableValues;
     }
 
     @Override
     protected AllowableValue getDefaultSchemaAccessStrategy() {
-        return headerDerivedAllowableValue;
+        return SchemaInferenceUtil.INFER_SCHEMA;
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSource.java
new file mode 100644 (file)
index 0000000..ab4362a
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.io.input.BOMInputStream;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.inference.RecordSource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+
+public class CSVRecordSource implements RecordSource<CSVRecord> {
+    private final Iterator<CSVRecord> csvRecordIterator;
+
+    public CSVRecordSource(final InputStream in, final PropertyContext context) throws IOException {
+        final String charset = context.getProperty(CSVUtils.CHARSET).getValue();
+
+        final Reader reader;
+        try {
+            reader = new InputStreamReader(new BOMInputStream(in), charset);
+        } catch (UnsupportedEncodingException e) {
+            throw new ProcessException(e);
+        }
+
+        final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader().withTrim();
+        final CSVParser csvParser = new CSVParser(reader, csvFormat);
+        csvRecordIterator = csvParser.iterator();
+    }
+
+    @Override
+    public CSVRecord next() {
+        if (csvRecordIterator.hasNext()) {
+            final CSVRecord record = csvRecordIterator.next();
+            return record;
+        }
+
+        return null;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
new file mode 100644 (file)
index 0000000..b81ddc0
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.nifi.schema.inference.FieldTypeInference;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
+import org.apache.nifi.schema.inference.RecordSource;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecord> {
+
+    private final TimeValueInference timeValueInference;
+
+    public CSVSchemaInference(final TimeValueInference timeValueInference) {
+        this.timeValueInference = timeValueInference;
+    }
+
+
+    @Override
+    public RecordSchema inferSchema(final RecordSource<CSVRecord> recordSource) throws IOException {
+        final Map<String, FieldTypeInference> typeMap = new LinkedHashMap<>();
+        while (true) {
+            final CSVRecord rawRecord = recordSource.next();
+            if (rawRecord == null) {
+                break;
+            }
+
+            inferSchema(rawRecord, typeMap);
+        }
+
+        return createSchema(typeMap);
+    }
+
+
+    private void inferSchema(final CSVRecord csvRecord, final Map<String, FieldTypeInference> typeMap) {
+        final Map<String, String> values = csvRecord.toMap();
+        for (final Map.Entry<String, String> entry : values.entrySet()) {
+            final String value = entry.getValue();
+            if (value == null) {
+                return;
+            }
+
+            final String fieldName = entry.getKey();
+            final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
+            final String trimmed = trim(value);
+            final DataType dataType = getDataType(trimmed);
+            typeInference.addPossibleDataType(dataType);
+        }
+    }
+
+    private String trim(String value) {
+        return (value.length() > 1) && value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
+    }
+
+
+    private DataType getDataType(final String value) {
+        if (value == null || value.isEmpty()) {
+            return null;
+        }
+
+        if (NumberUtils.isParsable(value)) {
+            if (value.contains(".")) {
+                try {
+                    final double doubleValue = Double.parseDouble(value);
+                    if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
+                        return RecordFieldType.DOUBLE.getDataType();
+                    }
+
+                    return RecordFieldType.FLOAT.getDataType();
+                } catch (final NumberFormatException nfe) {
+                    return RecordFieldType.STRING.getDataType();
+                }
+            }
+
+            try {
+                final long longValue = Long.parseLong(value);
+                if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) {
+                    return RecordFieldType.LONG.getDataType();
+                }
+
+                return RecordFieldType.INT.getDataType();
+            } catch (final NumberFormatException nfe) {
+                return RecordFieldType.STRING.getDataType();
+            }
+        }
+
+        if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
+            return RecordFieldType.BOOLEAN.getDataType();
+        }
+
+        final Optional<DataType> timeDataType = timeValueInference.getDataType(value);
+        return timeDataType.orElse(RecordFieldType.STRING.getDataType());
+    }
+
+
+    private RecordSchema createSchema(final Map<String, FieldTypeInference> inferences) {
+        final List<RecordField> recordFields = new ArrayList<>(inferences.size());
+        inferences.forEach((fieldName, type) -> recordFields.add(new RecordField(fieldName, type.toDataType(), true)));
+        return new SimpleRecordSchema(recordFields);
+    }
+
+}
index 9e7293b..a462c72 100644 (file)
@@ -28,6 +28,7 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
@@ -240,16 +241,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
-        if (strategy.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
-            return createAccessStrategy();
-        } else {
-            return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
-        }
-    }
-
-    @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
         if (strategy.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
             return createAccessStrategy();
         } else {
index 4f9a791..9874a02 100644 (file)
 
 package org.apache.nifi.json;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
-
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
@@ -46,11 +38,16 @@ import org.codehaus.jackson.JsonToken;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.node.ArrayNode;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 public abstract class AbstractJsonRowRecordReader implements RecordReader {
     private final ComponentLog logger;
     private final JsonParser jsonParser;
-    private final boolean array;
     private final JsonNode firstJsonNode;
 
     private boolean firstObjectConsumed = false;
@@ -58,7 +55,10 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
     private static final JsonFactory jsonFactory = new JsonFactory();
     private static final ObjectMapper codec = new ObjectMapper();
 
-    public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException {
+
+    public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat)
+            throws IOException, MalformedRecordException {
+
         this.logger = logger;
 
         try {
@@ -67,10 +67,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
 
             JsonToken token = jsonParser.nextToken();
             if (token == JsonToken.START_ARRAY) {
-                array = true;
                 token = jsonParser.nextToken(); // advance to START_OBJECT token
-            } else {
-                array = false;
             }
 
             if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also
@@ -87,13 +84,15 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
     @Override
     public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
         final JsonNode nextNode = getNextJsonNode();
+        if (nextNode == null) {
+            return null;
+        }
+
         final RecordSchema schema = getSchema();
         try {
             return convertJsonNodeToRecord(nextNode, schema, coerceTypes, dropUnknownFields);
         } catch (final MalformedRecordException mre) {
             throw mre;
-        } catch (final IOException ioe) {
-            throw ioe;
         } catch (final Exception e) {
             logger.debug("Failed to convert JSON Element {} into a Record object using schema {} due to {}", new Object[] {nextNode, schema, e.toString(), e});
             throw new MalformedRecordException("Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema", e);
@@ -200,7 +199,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
     }
 
 
-    private JsonNode getNextJsonNode() throws JsonParseException, IOException, MalformedRecordException {
+    protected JsonNode getNextJsonNode() throws IOException, MalformedRecordException {
         if (!firstObjectConsumed) {
             firstObjectConsumed = true;
             return firstJsonNode;
@@ -227,23 +226,10 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         }
     }
 
-
     @Override
     public void close() throws IOException {
         jsonParser.close();
     }
 
-    protected JsonParser getJsonParser() {
-        return jsonParser;
-    }
-
-    protected JsonFactory getJsonFactory() {
-        return jsonFactory;
-    }
-
-    protected Optional<JsonNode> getFirstJsonNode() {
-        return Optional.ofNullable(firstJsonNode);
-    }
-
     protected abstract Record convertJsonNodeToRecord(JsonNode nextNode, RecordSchema schema, boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException;
 }
index d8f21ca..a59bc19 100644 (file)
 
 package org.apache.nifi.json;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.jayway.jsonpath.JsonPath;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
+import org.apache.nifi.schema.inference.RecordSourceFactory;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.SchemaRegistryService;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.codehaus.jackson.JsonNode;
 
-import com.jayway.jsonpath.JsonPath;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
 
 @Tags({"json", "jsonpath", "record", "reader", "parser"})
 @CapabilityDescription("Parses JSON records and evaluates user-defined JSON Path's against each JSON object. While the reader expects each record "
@@ -61,6 +71,7 @@ import com.jayway.jsonpath.JsonPath;
     expressionLanguageScope=ExpressionLanguageScope.NONE)
 public class JsonPathReader extends SchemaRegistryService implements RecordReaderFactory {
 
+
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
@@ -130,9 +141,31 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
     }
 
     @Override
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
+        allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA);
+        return allowableValues;
+    }
+
+    @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
+        final RecordSourceFactory<JsonNode> jsonSourceFactory = (var, in) -> new JsonRecordSource(in);
+        final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier = () -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
+
+        return SchemaInferenceUtil.getSchemaAccessStrategy(strategy, context, getLogger(), jsonSourceFactory, inferenceSupplier,
+            () -> super.getSchemaAccessStrategy(strategy, schemaRegistry, context));
+    }
+
+    @Override
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return SchemaInferenceUtil.INFER_SCHEMA;
+    }
+
+    @Override
     public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
-        final RecordSchema schema = getSchema(variables, in, null);
-        return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat);
+        final InputStream bufferedIn = new BufferedInputStream(in);
+        final RecordSchema schema = getSchema(variables, bufferedIn, null);
+        return new JsonPathRowRecordReader(jsonPaths, schema, bufferedIn, logger, dateFormat, timeFormat, timestampFormat);
     }
 
 }
index 90ab799..a664b88 100644 (file)
 
 package org.apache.nifi.json;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.text.DateFormat;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Supplier;
-
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.PathNotFoundException;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -43,11 +37,16 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.codehaus.jackson.JsonNode;
 
-import com.jayway.jsonpath.Configuration;
-import com.jayway.jsonpath.DocumentContext;
-import com.jayway.jsonpath.JsonPath;
-import com.jayway.jsonpath.PathNotFoundException;
-import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
 
 public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
     private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
@@ -62,9 +61,9 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
     private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
 
     public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
-        final String dateFormat, final String timeFormat, final String timestampFormat)
-        throws MalformedRecordException, IOException {
-        super(in, logger);
+                final String dateFormat, final String timeFormat, final String timestampFormat)
+                throws MalformedRecordException, IOException {
+        super(in, logger, dateFormat, timeFormat, timestampFormat);
 
         final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
         final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
@@ -91,7 +90,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
     }
 
     @Override
-    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields) throws IOException {
+    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields) {
         if (jsonNode == null) {
             return null;
         }
@@ -118,12 +117,12 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
             }
 
             final Optional<RecordField> field = schema.getField(fieldName);
-            final Object defaultValue = field.isPresent() ? field.get().getDefaultValue() : null;
+            final Object defaultValue = field.map(RecordField::getDefaultValue).orElse(null);
 
             if (coerceTypes && desiredType != null) {
                 value = convert(value, desiredType, fieldName, defaultValue);
             } else {
-                final DataType dataType = field.isPresent() ? field.get().getDataType() : null;
+                final DataType dataType = field.map(RecordField::getDataType).orElse(null);
                 value = convert(value, dataType);
             }
 
@@ -232,7 +231,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
                 final Optional<DataType> desiredTypeOption = childSchema.getDataType(key);
                 if (desiredTypeOption.isPresent()) {
                     final Optional<RecordField> field = childSchema.getField(key);
-                    final Object defaultFieldValue = field.isPresent() ? field.get().getDefaultValue() : null;
+                    final Object defaultFieldValue = field.map(RecordField::getDefaultValue).orElse(null);
 
                     final Object coercedValue = convert(entry.getValue(), desiredTypeOption.get(), fieldName + "." + key, defaultFieldValue);
                     coercedValues.put(key, coercedValue);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java
new file mode 100644 (file)
index 0000000..3887fb0
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+import org.apache.nifi.schema.inference.RecordSource;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final JsonFactory jsonFactory;
+    private final JsonParser jsonParser;
+
+    static {
+        jsonFactory = new JsonFactory();
+        jsonFactory.setCodec(new ObjectMapper());
+    }
+
+    public JsonRecordSource(final InputStream in) throws IOException {
+        jsonParser = jsonFactory.createJsonParser(in);
+    }
+
+    @Override
+    public JsonNode next() throws IOException {
+        while (true) {
+            final JsonToken token = jsonParser.nextToken();
+            if (token == null) {
+                return null;
+            }
+
+            if (token == JsonToken.START_OBJECT) {
+                return jsonParser.readValueAsTree();
+            }
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
new file mode 100644 (file)
index 0000000..b09c79f
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+import org.apache.nifi.schema.inference.HierarchicalSchemaInference;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+public class JsonSchemaInference extends HierarchicalSchemaInference<JsonNode> {
+
+    private final TimeValueInference timeValueInference;
+
+    public JsonSchemaInference(final TimeValueInference timeValueInference) {
+        this.timeValueInference = timeValueInference;
+    }
+
+
+    protected DataType getDataType(final JsonNode jsonNode) {
+        if (jsonNode.isTextual()) {
+            final String text = jsonNode.getTextValue();
+            if (text == null) {
+                return RecordFieldType.STRING.getDataType();
+            }
+
+            final Optional<DataType> timeDataType = timeValueInference.getDataType(text);
+            return timeDataType.orElse(RecordFieldType.STRING.getDataType());
+        }
+
+        if (jsonNode.isObject()) {
+            final RecordSchema schema = createSchema(jsonNode);
+            return RecordFieldType.RECORD.getRecordDataType(schema);
+        }
+
+        if (jsonNode.isIntegralNumber()) {
+            return RecordFieldType.LONG.getDataType();
+        }
+
+        if (jsonNode.isFloatingPointNumber()) {
+            return RecordFieldType.FLOAT.getDataType();
+        }
+        if (jsonNode.isDouble()) {
+            return RecordFieldType.DOUBLE.getDataType();
+        }
+        if (jsonNode.isBinary()) {
+            return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+        }
+        if (jsonNode.isBoolean()) {
+            return RecordFieldType.BOOLEAN.getDataType();
+        }
+
+        return null;
+    }
+
+    @Override
+    protected boolean isObject(final JsonNode value) {
+        return value.isObject();
+    }
+
+    @Override
+    protected boolean isArray(final JsonNode value) {
+        return value.isArray();
+    }
+
+    @Override
+    protected void forEachFieldInRecord(final JsonNode rawRecord, final BiConsumer<String, JsonNode> fieldConsumer) {
+        final Iterator<Map.Entry<String, JsonNode>> itr = rawRecord.getFields();
+        while (itr.hasNext()) {
+            final Map.Entry<String, JsonNode> entry = itr.next();
+            final String fieldName = entry.getKey();
+            final JsonNode value = entry.getValue();
+
+            fieldConsumer.accept(fieldName, value);
+        }
+    }
+
+    @Override
+    protected void forEachRawRecordInArray(final JsonNode arrayRecord, final Consumer<JsonNode> rawRecordConsumer) {
+        final ArrayNode arrayNode = (ArrayNode) arrayRecord;
+        for (final JsonNode element : arrayNode) {
+            rawRecordConsumer.accept(element);
+        }
+    }
+
+    @Override
+    protected String getRootName(final JsonNode rawRecord) {
+        return null;
+    }
+}