NIFI-5937 use processor-configured encoding instead of the system default
authorAlex Savitsky <alex.savitsky@scotiabank.com>
Tue, 8 Jan 2019 15:15:45 +0000 (10:15 -0500)
committerEd <edward.berezitsky@gmail.com>
Wed, 9 Jan 2019 04:28:23 +0000 (23:28 -0500)
NIFI-5937 added tests to verify that accented characters are preserved correctly

NIFI-5937 unfolding starred imports

NIFI-5937 unfolding starred imports (now with statics)

Signed-off-by: Ed <edward.berezitsky@gmail.com>
This closes #3250

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java

index ac36604..52de424 100644 (file)
@@ -73,6 +73,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigInteger;
 import java.net.URL;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -198,6 +199,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         descriptors.add(ID_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
+        descriptors.add(CHARSET);
         descriptors.add(INDEX_OP);
         descriptors.add(SUPPRESS_NULLS);
 
@@ -313,6 +315,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
         final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
         final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
         final StringBuilder sb = new StringBuilder();
+        final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
         int recordCount = 0;
         try (final InputStream in = session.read(flowFile);
@@ -345,7 +348,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                 writeRecord(record, record.getSchema(), generator);
                 generator.flush();
                 generator.close();
-                json.append(out.toString());
+                json.append(out.toString(charset.name()));
 
                 buildBulkCommand(sb, index, docType, indexOp, id, json.toString());
                 recordCount++;
index 862e177..2cc16c1 100644 (file)
  */
 package org.apache.nifi.processors.elasticsearch;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.HashMap;
-import java.util.List;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okio.Buffer;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -42,16 +39,21 @@ import org.junit.After;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import okhttp3.Call;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Protocol;
-import okhttp3.Request;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
 
-public class TestPutElasticsearchHttpRecord {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+public class TestPutElasticsearchHttpRecord {
     private TestRunner runner;
 
     @After
@@ -61,7 +63,25 @@ public class TestPutElasticsearchHttpRecord {
 
     @Test
     public void testPutElasticSearchOnTriggerIndex() throws IOException {
-        runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+        PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
+        processor.setRecordChecks(record -> {
+            assertEquals(1, record.get("id"));
+            assertEquals("reç1", record.get("name"));
+            assertEquals(101, record.get("code"));
+        }, record -> {
+            assertEquals(2, record.get("id"));
+            assertEquals("ræc2", record.get("name"));
+            assertEquals(102, record.get("code"));
+        }, record -> {
+            assertEquals(3, record.get("id"));
+            assertEquals("rèc3", record.get("name"));
+            assertEquals(103, record.get("code"));
+        }, record -> {
+            assertEquals(4, record.get("id"));
+            assertEquals("rëc4", record.get("name"));
+            assertEquals(104, record.get("code"));
+        });
+        runner = TestRunners.newTestRunner(processor); // no failures
         generateTestData();
         runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
 
@@ -368,6 +388,7 @@ public class TestPutElasticsearchHttpRecord {
         int statusCode = 200;
         String statusMessage = "OK";
         String expectedUrl = null;
+        Consumer<Map>[] recordChecks;
 
         PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
             this.responseHasFailures = responseHasFailures;
@@ -382,6 +403,11 @@ public class TestPutElasticsearchHttpRecord {
             expectedUrl = url;
         }
 
+        @SafeVarargs
+        final void setRecordChecks(Consumer<Map>... checks) {
+            recordChecks = checks;
+        }
+
         @Override
         protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
             client = mock(OkHttpClient.class);
@@ -391,6 +417,24 @@ public class TestPutElasticsearchHttpRecord {
                 if (statusCode != -1) {
                     Request realRequest = (Request) invocationOnMock.getArguments()[0];
                     assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString())));
+                    if (recordChecks != null) {
+                        final ObjectMapper mapper = new ObjectMapper();
+                        Buffer sink = new Buffer();
+                        realRequest.body().writeTo(sink);
+                        String line;
+                        int recordIndex = 0;
+                        boolean content = false;
+                        while ((line = sink.readUtf8Line()) != null) {
+                            if (content) {
+                                content = false;
+                                if (recordIndex < recordChecks.length) {
+                                    recordChecks[recordIndex++].accept(mapper.readValue(line, Map.class));
+                                }
+                            } else {
+                                content = true;
+                            }
+                        }
+                    }
                     StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
                     sb.append(responseHasFailures);
                     sb.append("\", \"items\": [");
@@ -521,9 +565,9 @@ public class TestPutElasticsearchHttpRecord {
         parser.addSchemaField("name", RecordFieldType.STRING);
         parser.addSchemaField("code", RecordFieldType.INT);
 
-        parser.addRecord(1, "rec1", 101);
-        parser.addRecord(2, "rec2", 102);
-        parser.addRecord(3, "rec3", 103);
-        parser.addRecord(4, "rec4", 104);
+        parser.addRecord(1, "reç1", 101);
+        parser.addRecord(2, "ræc2", 102);
+        parser.addRecord(3, "rèc3", 103);
+        parser.addRecord(4, "rëc4", 104);
     }
 }