[HUDI-4551] Tweak the default parallelism of flink pipeline to execution env paralle... master
authorNicholas Jiang <programgeek@163.com>
Thu, 18 Aug 2022 08:13:28 +0000 (16:13 +0800)
committerGitHub <noreply@github.com>
Thu, 18 Aug 2022 08:13:28 +0000 (16:13 +0800)
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java [new file with mode: 0644]
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java

index 3638113288554afd199541728b936b9b4426c96d..7b78fb8d6a288ceaa4ab6a29f029c78c47654ee7 100644 (file)
@@ -153,8 +153,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
       .key("read.tasks")
       .intType()
-      .defaultValue(4)
-      .withDescription("Parallelism of tasks that do actual read, default is 4");
+      .noDefaultValue()
+      .withDescription("Parallelism of tasks that do actual read, default is the parallelism of the execution environment");
 
   public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = ConfigOptions
       .key("source.avro-schema.path")
@@ -395,19 +395,19 @@ public class FlinkOptions extends HoodieConfig {
       .key("write.index_bootstrap.tasks")
       .intType()
       .noDefaultValue()
-      .withDescription("Parallelism of tasks that do index bootstrap, default is the parallelism of the execution environment");
+      .withDescription("Parallelism of tasks that do index bootstrap, default same as the sink parallelism");
 
   public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
       .key("write.bucket_assign.tasks")
       .intType()
       .noDefaultValue()
-      .withDescription("Parallelism of tasks that do bucket assign, default is the parallelism of the execution environment");
+      .withDescription("Parallelism of tasks that do bucket assign, default same as the write task parallelism");
 
   public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
       .key("write.tasks")
       .intType()
-      .defaultValue(4)
-      .withDescription("Parallelism of tasks that do actual write, default is 4");
+      .noDefaultValue()
+      .withDescription("Parallelism of tasks that do actual write, default is the parallelism of the execution environment");
 
   public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
       .key("write.task.max.size")
@@ -512,8 +512,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
       .key("compaction.tasks")
       .intType()
-      .defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket)
-      .withDescription("Parallelism of tasks that do actual compaction, default is 4");
+      .noDefaultValue()
+      .withDescription("Parallelism of tasks that do actual compaction, default same as the write task parallelism");
 
   public static final String NUM_COMMITS = "num_commits";
   public static final String TIME_ELAPSED = "time_elapsed";
@@ -630,8 +630,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
       .key("clustering.tasks")
       .intType()
-      .defaultValue(4)
-      .withDescription("Parallelism of tasks that do actual clustering, default is 4");
+      .noDefaultValue()
+      .withDescription("Parallelism of tasks that do actual clustering, default same as the write task parallelism");
 
   public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = ConfigOptions
       .key("clustering.plan.strategy.daybased.lookback.partitions")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
new file mode 100644 (file)
index 0000000..3e02d23
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.configuration;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Tool helping to infer the flink options {@link FlinkOptions}.
+ */
+public class OptionsInference {
+
+  /**
+   * Sets up the default source task parallelism if it is not specified.
+   *
+   * @param conf     The configuration
+   * @param envTasks The parallelism of the execution env
+   */
+  public static void setupSourceTasks(Configuration conf, int envTasks) {
+    if (!conf.contains(FlinkOptions.READ_TASKS)) {
+      conf.setInteger(FlinkOptions.READ_TASKS, envTasks);
+    }
+  }
+
+  /**
+   * Sets up the default sink tasks parallelism if it is not specified.
+   *
+   * @param conf     The configuration
+   * @param envTasks The parallelism of the execution env
+   */
+  public static void setupSinkTasks(Configuration conf, int envTasks) {
+    // write task number, default same as execution env tasks
+    if (!conf.contains(FlinkOptions.WRITE_TASKS)) {
+      conf.setInteger(FlinkOptions.WRITE_TASKS, envTasks);
+    }
+    int writeTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
+    // bucket assign tasks, default same as write tasks
+    if (!conf.contains(FlinkOptions.BUCKET_ASSIGN_TASKS)) {
+      conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, writeTasks);
+    }
+    // compaction tasks, default same as write tasks
+    if (!conf.contains(FlinkOptions.COMPACTION_TASKS)) {
+      conf.setInteger(FlinkOptions.COMPACTION_TASKS, writeTasks);
+    }
+    // clustering tasks, default same as write tasks
+    if (!conf.contains(FlinkOptions.CLUSTERING_TASKS)) {
+      conf.setInteger(FlinkOptions.CLUSTERING_TASKS, writeTasks);
+    }
+  }
+}
index 0341d0af7f87d61d153934b81b34d8e5df695046..18b27daeb9523291e7da18846ce4749b16310c25 100644 (file)
@@ -209,9 +209,8 @@ public class Pipelines {
   public static DataStream<HoodieRecord> bootstrap(
       Configuration conf,
       RowType rowType,
-      int defaultParallelism,
       DataStream<RowData> dataStream) {
-    return bootstrap(conf, rowType, defaultParallelism, dataStream, false, false);
+    return bootstrap(conf, rowType, dataStream, false, false);
   }
 
   /**
@@ -221,7 +220,6 @@ public class Pipelines {
    *
    * @param conf               The configuration
    * @param rowType            The row type
-   * @param defaultParallelism The default parallelism
    * @param dataStream         The data stream
    * @param bounded            Whether the source is bounded
    * @param overwrite          Whether it is insert overwrite
@@ -229,7 +227,6 @@ public class Pipelines {
   public static DataStream<HoodieRecord> bootstrap(
       Configuration conf,
       RowType rowType,
-      int defaultParallelism,
       DataStream<RowData> dataStream,
       boolean bounded,
       boolean overwrite) {
@@ -237,16 +234,15 @@ public class Pipelines {
     if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
       return rowDataToHoodieRecord(conf, rowType, dataStream);
     } else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) {
-      return boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
+      return boundedBootstrap(conf, rowType, dataStream);
     } else {
-      return streamBootstrap(conf, rowType, defaultParallelism, dataStream, bounded);
+      return streamBootstrap(conf, rowType, dataStream, bounded);
     }
   }
 
   private static DataStream<HoodieRecord> streamBootstrap(
       Configuration conf,
       RowType rowType,
-      int defaultParallelism,
       DataStream<RowData> dataStream,
       boolean bounded) {
     DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
@@ -257,7 +253,7 @@ public class Pipelines {
               "index_bootstrap",
               TypeInformation.of(HoodieRecord.class),
               new BootstrapOperator<>(conf))
-          .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+          .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
           .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
     }
 
@@ -272,7 +268,6 @@ public class Pipelines {
   private static DataStream<HoodieRecord> boundedBootstrap(
       Configuration conf,
       RowType rowType,
-      int defaultParallelism,
       DataStream<RowData> dataStream) {
     final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
     // shuffle by partition keys
@@ -284,7 +279,7 @@ public class Pipelines {
             "batch_index_bootstrap",
             TypeInformation.of(HoodieRecord.class),
             new BatchBootstrapOperator<>(conf))
-        .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
+        .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
         .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
   }
 
@@ -315,11 +310,10 @@ public class Pipelines {
    * and flushes the data set to disk.
    *
    * @param conf               The configuration
-   * @param defaultParallelism The default parallelism
    * @param dataStream         The input data stream
    * @return the stream write data stream pipeline
    */
-  public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
+  public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
     if (OptionsResolver.isBucketIndexType(conf)) {
       WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
       int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
@@ -339,7 +333,7 @@ public class Pipelines {
               TypeInformation.of(HoodieRecord.class),
               new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
           .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
-          .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
+          .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
           // shuffle by fileId(bucket id)
           .keyBy(record -> record.getCurrentLocation().getFileId())
           .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
index 29f55f78acf17613b0115ec818f73c40a074db47..b153b2273cf6b7e4c10002f462e308355c363a30 100644 (file)
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
@@ -76,7 +77,6 @@ public class HoodieFlinkStreamer {
             .getLogicalType();
 
     long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
-    int parallelism = env.getParallelism();
     conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
 
     DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
@@ -98,8 +98,9 @@ public class HoodieFlinkStreamer {
       }
     }
 
-    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
-    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
+    OptionsInference.setupSinkTasks(conf, env.getParallelism());
+    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
     if (OptionsResolver.needsAsyncCompaction(conf)) {
       Pipelines.compact(conf, pipeline);
     } else {
index 5af86867d86d6a5077007394b1bbcf4d2dd82d02..f8799d3ac940ae994c08f3d10af428166288116e 100644 (file)
@@ -22,6 +22,7 @@ import org.apache.hudi.adapter.DataStreamSinkProviderAdapter;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.util.ChangelogModes;
@@ -66,6 +67,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
       long ckpTimeout = dataStream.getExecutionEnvironment()
           .getCheckpointConfig().getCheckpointTimeout();
       conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+      // set up default parallelism
+      OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());
 
       RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();
 
@@ -85,14 +88,12 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
         }
       }
 
-      // default parallelism
-      int parallelism = dataStream.getExecutionConfig().getParallelism();
       DataStream<Object> pipeline;
       // bootstrap
       final DataStream<HoodieRecord> hoodieRecordDataStream =
-          Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded(), overwrite);
+          Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
       // write pipeline
-      pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
+      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
       // compaction
       if (OptionsResolver.needsAsyncCompaction(conf)) {
         // use synchronous compaction for bounded source.
index 2034cb322eb8e91800ee97139e79dee097795b54..2deb33d842cf6beae7278ad3ef41106a6d87cde3 100644 (file)
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.source.FileIndex;
@@ -179,6 +180,7 @@ public class HoodieTableSource implements
         @SuppressWarnings("unchecked")
         TypeInformation<RowData> typeInfo =
             (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
+        OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
         if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
           StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
               conf, FilePathUtils.toFlinkPath(path), tableRowType, maxCompactionMemoryInBytes, getRequiredPartitionPaths());
index 680c4d02e238bc58ce9eacfe9c21794b41202f4d..aa420a433dd8953c95d3acaccaf09ae6340842fb 100644 (file)
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
@@ -239,9 +240,9 @@ public class ITTestDataStreamWrite extends TestLogger {
       dataStream = transformer.get().apply(dataStream);
     }
 
-    int parallelism = execEnv.getParallelism();
-    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
-    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
+    OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
+    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
     execEnv.addOperator(pipeline.getTransformation());
 
     if (isMor) {
@@ -305,6 +306,7 @@ public class ITTestDataStreamWrite extends TestLogger {
           .setParallelism(4);
     }
 
+    OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
     DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, true);
     execEnv.addOperator(pipeline.getTransformation());
 
index aba8e4c7b4b929a7977ddd701a3684798be21c53..a0073d8a3703d414aa35251abf570aece36bcc28 100644 (file)
@@ -88,7 +88,7 @@ public class ITTestHoodieFlinkClustering {
     EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
 
@@ -187,7 +187,7 @@ public class ITTestHoodieFlinkClustering {
     EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
 
index a8b78ab64da5bfa55f7ee27d1725218fe440a77f..428f65f37c1d80d87f045b6b104f9cfa85c1578e 100644 (file)
@@ -101,7 +101,7 @@ public class ITTestHoodieFlinkCompactor {
     EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
@@ -173,7 +173,7 @@ public class ITTestHoodieFlinkCompactor {
     EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
@@ -214,7 +214,7 @@ public class ITTestHoodieFlinkCompactor {
     EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
@@ -294,7 +294,7 @@ public class ITTestHoodieFlinkCompactor {
     EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
     TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
     tableEnv.getConfig().getConfiguration()
-            .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+            .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
index bf3abf74b844849f6a6a4727de98404b5a5a6771..66af39e4c1c18c966510f0bee34bb4344270d71a 100644 (file)
@@ -84,7 +84,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
     EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
     streamTableEnv = TableEnvironmentImpl.create(settings);
     streamTableEnv.getConfig().getConfiguration()
-        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
     Configuration execConf = streamTableEnv.getConfig().getConfiguration();
     execConf.setString("execution.checkpointing.interval", "2s");
     // configure not to retry after failure
@@ -93,7 +93,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
 
     batchTableEnv = TestTableEnvs.getBatchTableEnv();
     batchTableEnv.getConfig().getConfiguration()
-        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
   }
 
   @TempDir