-## *Tips*
-- *Thank you very much for contributing to Apache Hudi.*
-- *Please review https://hudi.apache.org/contribute/how-to-contribute before opening a pull request.*
+### Change Logs
-## What is the purpose of the pull request
+_Describe context and summary for this change. Highlight if any code was copied._
-*(For example: This pull request adds quick-start document.)*
+### Impact
-## Brief change log
+_Describe any public API or user-facing feature change or any performance impact._
-*(for example:)*
- - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
+**Risk level: none | low | medium | high**
-## Verify this pull request
+_Choose one. If medium or high, explain what verification was done to mitigate the risks._
-*(Please pick either of the following options)*
+### Contributor's checklist
-This pull request is a trivial rework / code cleanup without any test coverage.
-
-*(or)*
-
-This pull request is already covered by existing tests, such as *(please describe tests)*.
-
-(or)
-
-This change added tests and can be verified as follows:
-
-*(example:)*
-
- - *Added integration tests for end-to-end.*
- - *Added HoodieClientWriteTest to verify the change.*
- - *Manually verified the change by running a job locally.*
-
-## Committer checklist
-
- - [ ] Has a corresponding JIRA in PR title & commit
-
- - [ ] Commit message is descriptive of the change
-
- - [ ] CI is green
-
- - [ ] Necessary doc changes done or have another open PR
-
- - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
+- [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
+- [ ] Change Logs and Impact were stated clearly
+- [ ] Adequate tests were added if applicable
+- [ ] CI passed
sparkProfile: "spark3.2"
flinkProfile: "flink1.14"
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3.3"
+ flinkProfile: "flink1.14"
+
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
FLINK_PROFILE: ${{ matrix.flinkProfile }}
- if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 before hadoop upgrade to 3.x
run:
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark
- name: Spark SQL Test
FLINK_PROFILE: ${{ matrix.flinkProfile }}
if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI
run:
- mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" '-Dtest=org.apache.spark.sql.hudi.Test*' -pl hudi-spark-datasource/hudi-spark
+ mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" '-Dtest=Test*' -pl hudi-spark-datasource/hudi-spark
variables:
BUILD_PROFILES: '-Dscala-2.11 -Dspark2 -Dflink1.14'
PLUGIN_OPTS: '-Dcheckstyle.skip=true -Drat.skip=true -Djacoco.skip=true'
- MVN_OPTS_INSTALL: '-T 2.5C -DskipTests $(BUILD_PROFILES) $(PLUGIN_OPTS)'
+ MVN_OPTS_INSTALL: '-DskipTests $(BUILD_PROFILES) $(PLUGIN_OPTS)'
MVN_OPTS_TEST: '-fae $(BUILD_PROFILES) $(PLUGIN_OPTS)'
SPARK_VERSION: '2.4.4'
HADOOP_VERSION: '2.7'
jobs:
- job: UT_FT_1
displayName: UT FT common & flink & UT client/spark-client
- timeoutInMinutes: '120'
+ timeoutInMinutes: '150'
steps:
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: maven install
inputs:
mavenPomFile: 'pom.xml'
options: $(MVN_OPTS_INSTALL)
publishJUnitResults: false
jdkVersionOption: '1.8'
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: UT common flink client/spark-client
inputs:
mavenPomFile: 'pom.xml'
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g'
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: FT common flink
inputs:
mavenPomFile: 'pom.xml'
mavenOptions: '-Xmx4g'
- job: UT_FT_2
displayName: FT client/spark-client
- timeoutInMinutes: '120'
+ timeoutInMinutes: '150'
steps:
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: maven install
inputs:
mavenPomFile: 'pom.xml'
options: $(MVN_OPTS_INSTALL)
publishJUnitResults: false
jdkVersionOption: '1.8'
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: FT client/spark-client
inputs:
mavenPomFile: 'pom.xml'
mavenOptions: '-Xmx4g'
- job: UT_FT_3
displayName: UT FT clients & cli & utilities & sync
- timeoutInMinutes: '120'
+ timeoutInMinutes: '150'
steps:
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: maven install
inputs:
mavenPomFile: 'pom.xml'
options: $(MVN_OPTS_INSTALL)
publishJUnitResults: false
jdkVersionOption: '1.8'
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: UT clients & cli & utilities & sync
inputs:
mavenPomFile: 'pom.xml'
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g'
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: FT clients & cli & utilities & sync
inputs:
mavenPomFile: 'pom.xml'
mavenOptions: '-Xmx4g'
- job: UT_FT_4
displayName: UT FT other modules
- timeoutInMinutes: '120'
+ timeoutInMinutes: '150'
steps:
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: maven install
inputs:
mavenPomFile: 'pom.xml'
options: $(MVN_OPTS_INSTALL)
publishJUnitResults: false
jdkVersionOption: '1.8'
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: UT other modules
inputs:
mavenPomFile: 'pom.xml'
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g'
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: FT other modules
inputs:
mavenPomFile: 'pom.xml'
mavenOptions: '-Xmx4g'
- job: IT
displayName: IT modules
- timeoutInMinutes: '120'
+ timeoutInMinutes: '150'
steps:
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: maven install
inputs:
mavenPomFile: 'pom.xml'
options: $(MVN_OPTS_INSTALL) -Pintegration-tests
publishJUnitResults: false
jdkVersionOption: '1.8'
- - task: Maven@3
+ - task: Maven@3.205.1
displayName: UT integ-test
inputs:
mavenPomFile: 'pom.xml'
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
# hive sync
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
-hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
-hoodie.datasource.hive_sync.partition_fields=partition
\ No newline at end of file
+hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
+hoodie.datasource.hive_sync.partition_fields=partition
+hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
--- /dev/null
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dag_name: unit-test-cow-dag
+dag_rounds: 1
+dag_intermittent_delay_mins: 10
+dag_content:
+ first_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 2
+ num_records_insert: 100
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 100
+ type: InsertNode
+ deps: first_insert
+ third_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 100
+ type: InsertNode
+ deps: second_insert
+ first_upsert:
+ config:
+ record_size: 70000
+ num_partitions_upsert: 1
+ repeat_count: 1
+ num_records_upsert: 100
+ type: UpsertNode
+ deps: third_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_upsert
+ first_presto_query:
+ config:
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 400
+ query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: first_hive_sync
+# first_trino_query:
+# config:
+# trino_queries:
+# query1: "select count(*) from testdb1.table1"
+# result1: 300
+# query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
+# result2: 0
+# type: TrinoQueryNode
+# deps: first_presto_query
\ No newline at end of file
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
-import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
- option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
- option(HiveSyncConfig.HIVE_USER.key(), "hive").
- option(HiveSyncConfig.HIVE_PASS.key(), "hive").
- option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
+ option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
+ option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
+ option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
+ option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
- option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
- option(HiveSyncConfig.HIVE_USER.key(), "hive").
- option(HiveSyncConfig.HIVE_PASS.key(), "hive").
- option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
+ option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
+ option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
+ option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
+ option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true").
option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi-hadoop-docker</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-aws</artifactId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<name>hudi-aws</name>
<packaging>jar</packaging>
<!-- Logging -->
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
</dependency>
<!-- Hadoop -->
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.hive.AbstractHiveSyncHoodieClient;
import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.Partition;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateTableRequest;
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.util.stream.Collectors;
import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
-import static org.apache.hudi.common.util.MapUtils.nonEmpty;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
+import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
/**
*
* @Experimental
*/
-public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
+public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
private static final Logger LOG = LogManager.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
private final AWSGlue awsGlue;
private final String databaseName;
- public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
- super(syncConfig, hadoopConf, fs);
+ public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
+ super(config);
this.awsGlue = AWSGlueClientBuilder.standard().build();
- this.databaseName = syncConfig.databaseName;
+ this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
}
@Override
StorageDescriptor sd = table.getStorageDescriptor();
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
- String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+ String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
StorageDescriptor sd = table.getStorageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
- String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+ String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
sd.setLocation(fullPartitionPath);
PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
*/
@Override
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
- if (nonEmpty(tableProperties)) {
+ if (isNullOrEmpty(tableProperties)) {
return;
}
try {
- updateTableParameters(awsGlue, databaseName, tableName, tableProperties, true);
+ updateTableParameters(awsGlue, databaseName, tableName, tableProperties, false);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update properties for table " + tableId(databaseName, tableName), e);
}
}
@Override
- public void updateTableDefinition(String tableName, MessageType newSchema) {
+ public void updateTableSchema(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
- boolean cascade = syncConfig.partitionFields.size() > 0;
+ boolean cascade = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
try {
Table table = getTable(awsGlue, databaseName, tableName);
- Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
- List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
- String keyType = getPartitionKeyType(newSchemaMap, key);
- return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
- }).collect(Collectors.toList());
+ Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
+ List<Column> newColumns = getColumnsFromSchema(newSchemaMap);
StorageDescriptor sd = table.getStorageDescriptor();
sd.setColumns(newColumns);
}
}
- @Override
- public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
- throw new UnsupportedOperationException("Not supported: `getTableCommentUsingMetastoreClient`");
- }
-
- @Override
- public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
- throw new UnsupportedOperationException("Not supported: `updateTableComments`");
- }
-
- @Override
- public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String, String> newComments) {
- throw new UnsupportedOperationException("Not supported: `updateTableComments`");
- }
-
@Override
public void createTable(String tableName,
MessageType storageSchema,
}
CreateTableRequest request = new CreateTableRequest();
Map<String, String> params = new HashMap<>();
- if (!syncConfig.createManagedTable) {
+ if (!config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) {
params.put("EXTERNAL", "TRUE");
}
params.putAll(tableProperties);
try {
- Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
+ Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
- List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
- for (String key : mapSchema.keySet()) {
- String keyType = getPartitionKeyType(mapSchema, key);
- Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
- // In Glue, the full schema should exclude the partition keys
- if (!syncConfig.partitionFields.contains(key)) {
- schemaWithoutPartitionKeys.add(column);
- }
- }
+ List<Column> schemaWithoutPartitionKeys = getColumnsFromSchema(mapSchema);
// now create the schema partition
- List<Column> schemaPartitionKeys = syncConfig.partitionFields.stream().map(partitionKey -> {
+ List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
String keyType = getPartitionKeyType(mapSchema, partitionKey);
return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
serdeProperties.put("serialization.format", "1");
storageDescriptor
.withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
- .withLocation(s3aToS3(syncConfig.basePath))
+ .withLocation(s3aToS3(getBasePath()))
.withInputFormat(inputFormatClass)
.withOutputFormat(outputFormatClass)
.withColumns(schemaWithoutPartitionKeys);
}
@Override
- public Map<String, String> getTableSchema(String tableName) {
+ public Map<String, String> getMetastoreSchema(String tableName) {
try {
// GlueMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
}
}
- @Override
- public boolean doesTableExist(String tableName) {
- return tableExists(tableName);
- }
-
@Override
public boolean tableExists(String tableName) {
GetTableRequest request = new GetTableRequest()
@Override
public void updateLastCommitTimeSynced(String tableName) {
- if (!activeTimeline.lastInstant().isPresent()) {
+ if (!getActiveTimeline().lastInstant().isPresent()) {
LOG.warn("No commit in active timeline.");
return;
}
- final String lastCommitTimestamp = activeTimeline.lastInstant().get().getTimestamp();
+ final String lastCommitTimestamp = getActiveTimeline().lastInstant().get().getTimestamp();
try {
updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), false);
} catch (Exception e) {
throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
}
+ private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
+ List<Column> cols = new ArrayList<>();
+ for (String key : mapSchema.keySet()) {
+ // In Glue, the full schema should exclude the partition keys
+ if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
+ String keyType = getPartitionKeyType(mapSchema, key);
+ Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
+ cols.add(column);
+ }
+ }
+ return cols;
+ }
+
private enum TableType {
MANAGED_TABLE,
EXTERNAL_TABLE,
package org.apache.hudi.aws.sync;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.util.Properties;
/**
* Currently Experimental. Utility class that implements syncing a Hudi Table with the
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
* to enable querying via Glue ETLs, Athena etc.
- *
+ * <p>
* Extends HiveSyncTool since most logic is similar to Hive syncing,
* expect using a different client {@link AWSGlueCatalogSyncClient} that implements
* the necessary functionality using Glue APIs.
*/
public class AwsGlueCatalogSyncTool extends HiveSyncTool {
- public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
- super(props, new HiveConf(conf, HiveConf.class), fs);
- }
-
- public AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
- super(hiveSyncConfig, hiveConf, fs);
+ public AwsGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
+ super(props, hadoopConf);
}
@Override
- protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
- hoodieHiveClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, hiveConf, fs);
+ protected void initSyncClient(HiveSyncConfig hiveSyncConfig) {
+ syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
}
public static void main(String[] args) {
- // parse the params
- final HiveSyncConfig cfg = new HiveSyncConfig();
- JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help || args.length == 0) {
+ final HiveSyncConfig.HiveSyncConfigParams params = new HiveSyncConfig.HiveSyncConfigParams();
+ JCommander cmd = JCommander.newBuilder().addObject(params).build();
+ cmd.parse(args);
+ if (params.isHelp()) {
cmd.usage();
- System.exit(1);
+ System.exit(0);
}
- FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
- HiveConf hiveConf = new HiveConf();
- hiveConf.addResource(fs.getConf());
- new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
+ new AwsGlueCatalogSyncTool(params.toProps(), new Configuration()).syncHoodieTable();
}
}
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<!-- Logging -->
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
- @CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "true",
+ @CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "false",
help = "Enabling marker based rollback") final String rollbackUsingMarkers)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
@CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId")
public String unscheduleCompactFile(
@CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId,
- @CliOption(key = "partitionPath", mandatory = true, help = "partition path") final String partitionPath,
+ @CliOption(key = "partitionPath", unspecifiedDefaultValue = "", help = "partition path") final String partitionPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV,
@CliCommand(value = "show fsview all", help = "Show entire file-system view")
public String showAllFileSlices(
@CliOption(key = {"pathRegex"}, help = "regex to select files, eg: 2016/08/02",
- unspecifiedDefaultValue = "*/*/*") String globRegex,
+ unspecifiedDefaultValue = "") String globRegex,
@CliOption(key = {"baseFileOnly"}, help = "Only display base files view",
unspecifiedDefaultValue = "false") boolean baseFileOnly,
@CliOption(key = {"maxInstant"}, help = "File-Slices upto this instant are displayed",
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
+ globRegex = globRegex == null ? "" : globRegex;
+ // TODO: There is a bug in spring shell, if we pass */*/* to pathRegex, the last '/' will be lost, pathRegex will be */**
+ if (globRegex.endsWith("**")) {
+ globRegex = globRegex.replace("**", "*/*");
+ }
+
HoodieTableFileSystemView fsView = buildFileSystemView(globRegex, maxInstant, baseFileOnly, includeMaxInstant,
includeInflight, excludeCompaction);
List<Comparable[]> rows = new ArrayList<>();
@CliCommand(value = "show fsview latest", help = "Show latest file-system view")
public String showLatestFileSlices(
- @CliOption(key = {"partitionPath"}, help = "A valid partition path", mandatory = true) String partition,
+ @CliOption(key = {"partitionPath"}, help = "A valid partition path", unspecifiedDefaultValue = "") String partition,
@CliOption(key = {"baseFileOnly"}, help = "Only display base file view",
unspecifiedDefaultValue = "false") boolean baseFileOnly,
@CliOption(key = {"maxInstant"}, help = "File-Slices upto this instant are displayed",
@CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName,
@CliOption(key = "tableType", mandatory = true, help = "Table type") final String tableType,
@CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField,
- @CliOption(key = "partitionPathField", mandatory = true,
+ @CliOption(key = "partitionPathField", unspecifiedDefaultValue = "",
help = "Partition path field name") final String partitionPathField,
@CliOption(key = {"parallelism"}, mandatory = true,
help = "Parallelism for hoodie insert") final String parallelism,
* CLI command to display sync options.
*/
@Component
-public class HoodieSyncCommand implements CommandMarker {
+public class HoodieSyncValidateCommand implements CommandMarker {
@CliCommand(value = "sync validate", help = "Validate the sync by counting the number of records")
public String validateSync(
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
@CliCommand(value = "metadata list-files", help = "Print a list of all files in a partition from the metadata")
public String listFiles(
- @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true) final String partition) throws IOException {
+ @CliOption(key = {"partition"}, help = "Name of the partition to list files", unspecifiedDefaultValue = "") final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
return "[ERROR] Metadata Table not enabled/initialized\n\n";
}
+ Path partitionPath = new Path(HoodieCLI.basePath);
+ if (!StringUtils.isNullOrEmpty(partition)) {
+ partitionPath = new Path(HoodieCLI.basePath, partition);
+ }
+
HoodieTimer timer = new HoodieTimer().startTimer();
- FileStatus[] statuses = metaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
+ FileStatus[] statuses = metaReader.getAllFilesInPartition(partitionPath);
LOG.debug("Took " + timer.endTimer() + " ms");
final List<Comparable[]> rows = new ArrayList<>();
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(rollbackUsingMarkers)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
+ .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
HoodieFailedWritesCleaningPolicy.EAGER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
-import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
-import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter}
+import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter))
- val parquetConfig: HoodieAvroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)
+ val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
@Tag("functional")
public class TestFileSystemViewCommand extends CLIFunctionalTestHarness {
+ private String nonpartitionedTablePath;
+ private String partitionedTablePath;
private String partitionPath;
- private SyncableFileSystemView fsView;
+ private SyncableFileSystemView nonpartitionedFsView;
+ private SyncableFileSystemView partitionedFsView;
@BeforeEach
public void init() throws IOException {
+ createNonpartitionedTable();
+ createPartitionedTable();
+ }
+
+ private void createNonpartitionedTable() throws IOException {
HoodieCLI.conf = hadoopConf();
// Create table and connect
- String tableName = tableName();
- String tablePath = tablePath(tableName);
+ String nonpartitionedTableName = "nonpartitioned_" + tableName();
+ nonpartitionedTablePath = tablePath(nonpartitionedTableName);
new TableCommand().createTable(
- tablePath, tableName,
- "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");
+ nonpartitionedTablePath, nonpartitionedTableName,
+ "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+ HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+
+ Files.createDirectories(Paths.get(nonpartitionedTablePath));
+
+ // Generate 2 commits
+ String commitTime1 = "3";
+ String commitTime2 = "4";
+
+ String fileId1 = UUID.randomUUID().toString();
+
+ // Write date files and log file
+ String testWriteToken = "2-0-2";
+ Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
+ .makeBaseFileName(commitTime1, testWriteToken, fileId1)));
+ Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
+ .makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0, testWriteToken)));
+ Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
+ .makeBaseFileName(commitTime2, testWriteToken, fileId1)));
+ Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
+ .makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, testWriteToken)));
+
+ // Write commit files
+ Files.createFile(Paths.get(nonpartitionedTablePath, ".hoodie", commitTime1 + ".commit"));
+ Files.createFile(Paths.get(nonpartitionedTablePath, ".hoodie", commitTime2 + ".commit"));
+
+ // Reload meta client and create fsView
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ nonpartitionedFsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true);
+ }
+
+ private void createPartitionedTable() throws IOException {
+ HoodieCLI.conf = hadoopConf();
+
+ // Create table and connect
+ String partitionedTableName = "partitioned_" + tableName();
+ partitionedTablePath = tablePath(partitionedTableName);
+ new TableCommand().createTable(
+ partitionedTablePath, partitionedTableName,
+ "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
partitionPath = HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH;
- String fullPartitionPath = Paths.get(tablePath, partitionPath).toString();
+ String fullPartitionPath = Paths.get(partitionedTablePath, partitionPath).toString();
Files.createDirectories(Paths.get(fullPartitionPath));
// Generate 2 commits
.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, testWriteToken)));
// Write commit files
- Files.createFile(Paths.get(tablePath, ".hoodie", commitTime1 + ".commit"));
- Files.createFile(Paths.get(tablePath, ".hoodie", commitTime2 + ".commit"));
+ Files.createFile(Paths.get(partitionedTablePath, ".hoodie", commitTime1 + ".commit"));
+ Files.createFile(Paths.get(partitionedTablePath, ".hoodie", commitTime2 + ".commit"));
// Reload meta client and create fsView
metaClient = HoodieTableMetaClient.reload(metaClient);
- fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true);
+ partitionedFsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true);
}
/**
@Test
public void testShowCommits() {
// Test default show fsview all
- CommandResult cr = shell().executeCommand("show fsview all");
+ CommandResult cr = shell().executeCommand("show fsview all --pathRegex */*/*");
assertTrue(cr.isSuccess());
// Get all file groups
- Stream<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath);
+ Stream<HoodieFileGroup> fileGroups = partitionedFsView.getAllFileGroups(partitionPath);
List<Comparable[]> rows = new ArrayList<>();
fileGroups.forEach(fg -> fg.getAllFileSlices().forEach(fs -> {
@Test
public void testShowCommitsWithSpecifiedValues() {
// Test command with options, baseFileOnly and maxInstant is 2
- CommandResult cr = shell().executeCommand("show fsview all --baseFileOnly true --maxInstant 2");
+ CommandResult cr = shell().executeCommand("show fsview all --pathRegex */*/* --baseFileOnly true --maxInstant 2");
assertTrue(cr.isSuccess());
List<Comparable[]> rows = new ArrayList<>();
- Stream<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath);
+ Stream<HoodieFileGroup> fileGroups = partitionedFsView.getAllFileGroups(partitionPath);
// Only get instant 1, since maxInstant was specified 2
fileGroups.forEach(fg -> fg.getAllFileSlices().filter(fs -> fs.getBaseInstantTime().equals("1")).forEach(fs -> {
assertEquals(expected, got);
}
- /**
- * Test case for command 'show fsview latest'.
- */
- @Test
- public void testShowLatestFileSlices() {
- // Test show with partition path '2016/03/15'
- CommandResult cr = shell().executeCommand("show fsview latest --partitionPath " + partitionPath);
- assertTrue(cr.isSuccess());
-
- Stream<FileSlice> fileSlice = fsView.getLatestFileSlices(partitionPath);
-
+ private List<Comparable[]> fileSlicesToCRList(Stream<FileSlice> fileSlice, String partitionPath) {
List<Comparable[]> rows = new ArrayList<>();
fileSlice.forEach(fs -> {
int idx = 0;
.collect(Collectors.toList()).toString();
rows.add(row);
});
+ return rows;
+ }
+ /**(
+ * Test case for command 'show fsview latest'.
+ */
+ @Test
+ public void testShowLatestFileSlices() throws IOException {
Function<Object, String> converterFunction =
entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_BASE_UNSCHEDULED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_FILES_SCHEDULED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_FILES_UNSCHEDULED);
- String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows);
- expected = removeNonWordAndStripSpace(expected);
- String got = removeNonWordAndStripSpace(cr.getResult().toString());
- assertEquals(expected, got);
+
+ // Test show with partition path '2016/03/15'
+ new TableCommand().connect(partitionedTablePath, null, false, 0, 0, 0);
+ CommandResult partitionedTableCR = shell().executeCommand("show fsview latest --partitionPath " + partitionPath);
+ assertTrue(partitionedTableCR.isSuccess());
+
+ Stream<FileSlice> partitionedFileSlice = partitionedFsView.getLatestFileSlices(partitionPath);
+
+ List<Comparable[]> partitionedRows = fileSlicesToCRList(partitionedFileSlice, partitionPath);
+ String partitionedExpected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, partitionedRows);
+ partitionedExpected = removeNonWordAndStripSpace(partitionedExpected);
+ String partitionedResults = removeNonWordAndStripSpace(partitionedTableCR.getResult().toString());
+ assertEquals(partitionedExpected, partitionedResults);
+
+ // Test show for non-partitioned table
+ new TableCommand().connect(nonpartitionedTablePath, null, false, 0, 0, 0);
+ CommandResult nonpartitionedTableCR = shell().executeCommand("show fsview latest");
+ assertTrue(nonpartitionedTableCR.isSuccess());
+
+ Stream<FileSlice> nonpartitionedFileSlice = nonpartitionedFsView.getLatestFileSlices("");
+
+ List<Comparable[]> nonpartitionedRows = fileSlicesToCRList(nonpartitionedFileSlice, "");
+
+ String nonpartitionedExpected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, nonpartitionedRows);
+ nonpartitionedExpected = removeNonWordAndStripSpace(nonpartitionedExpected);
+ String nonpartitionedResults = removeNonWordAndStripSpace(nonpartitionedTableCR.getResult().toString());
+ assertEquals(nonpartitionedExpected, nonpartitionedResults);
}
}
<parent>
<artifactId>hudi-client</artifactId>
<groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-client-common</artifactId>
- <version>0.12.0-SNAPSHOT</version>
+ <version>0.13.0-SNAPSHOT</version>
<name>hudi-client-common</name>
<packaging>jar</packaging>
<!-- Logging -->
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
</dependency>
<!-- Parquet -->
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.hudi.config.HoodieWriteCommitCallbackConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
- if (!historySchemaStr.isEmpty()) {
- InternalSchema internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
- SerDeHelper.parseSchemas(historySchemaStr));
+ if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
+ InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
- InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, internalSchema);
+ if (historySchemaStr.isEmpty()) {
+ internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
+ internalSchema.setSchemaId(Long.parseLong(instantTime));
+ } else {
+ internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
+ SerDeHelper.parseSchemas(historySchemaStr));
+ }
+ InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema);
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
- schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
+ schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr.isEmpty() ? SerDeHelper.inheritSchemas(evolvedSchema, "") : historySchemaStr);
} else {
evolvedSchema.setSchemaId(Long.parseLong(instantTime));
String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
* @param writeOperationType
* @param metaClient
*/
- protected void preWrite(String instantTime, WriteOperationType writeOperationType,
+ public void preWrite(String instantTime, WriteOperationType writeOperationType,
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
SavepointHelpers.validateSavepointPresence(table, savepointTime);
+ ValidationUtils.checkArgument(!config.shouldArchiveBeyondSavepoint(), "Restore is not supported when " + HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT.key()
+ + " is enabled");
restoreToInstant(savepointTime, initialMetadataTableIfNecessary);
SavepointHelpers.validateSavepointRestore(table, savepointTime);
}
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);
if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
+ metaClient = HoodieTableMetaClient.reload(metaClient);
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
List<String> instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime);
- Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
- instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
+ if (!instantsToRollback.isEmpty()) {
+ Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
+ instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
- rollbackFailedWrites(pendingRollbacks, true);
+ rollbackFailedWrites(pendingRollbacks, true);
+ }
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
.run(HoodieTableVersion.current(), instantTime.orElse(null));
this.random = new Random(RANDOM_SEED);
}
+ public boolean isTrackingSuccessfulWrites() {
+ return trackSuccessRecords;
+ }
+
public void markSuccess(String recordKey) {
if (trackSuccessRecords) {
this.successRecordKeys.add(recordKey);
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
/**
* Archiver to bound the growth of files under .hoodie meta path.
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterInflights().firstInstant();
- // We cannot have any holes in the commit timeline. We cannot archive any commits which are
- // made after the first savepoint present.
+ // NOTE: We cannot have any holes in the commit timeline.
+ // We cannot archive any commits which are made after the first savepoint present,
+ // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
+ Set<String> savepointTimestamps = table.getSavepointTimestamps();
if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) {
// For Merge-On-Read table, inline or async compaction is enabled
// We need to make sure that there are enough delta commits in the active timeline
// Actually do the commits
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants()
.filter(s -> {
- // if no savepoint present, then don't filter
- return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
+ if (config.shouldArchiveBeyondSavepoint()) {
+ // skip savepoint commits and proceed further
+ return !savepointTimestamps.contains(s.getTimestamp());
+ } else {
+ // if no savepoint present, then don't filter
+ // stop at first savepoint commit
+ return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
+ }
}).filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained
return oldestPendingCompactionAndReplaceInstant
- .map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
+ .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}).filter(s -> {
// We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't
// get archived, i.e, instants after the oldestInflight are retained on the timeline
if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) {
return oldestInflightCommitInstant.map(instant ->
- HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
+ compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}
return true;
}).filter(s ->
oldestInstantToRetainForCompaction.map(instantToRetain ->
- HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
+ compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
);
-
return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
return Stream.empty();
instants = Stream.empty();
} else {
LOG.info("Limiting archiving of instants to latest compaction on metadata table at " + latestCompactionTime.get());
- instants = instants.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN,
+ instants = instants.filter(instant -> compareTimestamps(instant.getTimestamp(), LESSER_THAN,
latestCompactionTime.get()));
}
} catch (Exception e) {
}
}
- // If this is a metadata table, do not archive the commits that live in data set
- // active timeline. This is required by metadata table,
- // see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
- Option<String> earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp);
- if (earliestActiveDatasetCommit.isPresent()) {
- instants = instants.filter(instant ->
- HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get()));
+ Option<HoodieInstant> earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant();
+
+ if (config.shouldArchiveBeyondSavepoint()) {
+ // There are chances that there could be holes in the timeline due to archival and savepoint interplay.
+ // So, the first non-savepoint commit in the data timeline is considered as beginning of the active timeline.
+ Option<HoodieInstant> firstNonSavepointCommit = dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit();
+ if (firstNonSavepointCommit.isPresent()) {
+ String firstNonSavepointCommitTime = firstNonSavepointCommit.get().getTimestamp();
+ instants = instants.filter(instant ->
+ compareTimestamps(instant.getTimestamp(), LESSER_THAN, firstNonSavepointCommitTime));
+ }
+ } else {
+ // Do not archive the commits that live in data set active timeline.
+ // This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata for details.
+ if (earliestActiveDatasetCommit.isPresent()) {
+ instants = instants.filter(instant ->
+ compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp()));
+ }
}
}
}
List<HoodieInstant> instantsToBeDeleted =
- instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
+ instants.stream().filter(instant1 -> compareTimestamps(instant1.getTimestamp(),
LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList());
for (HoodieInstant deleteInstant : instantsToBeDeleted) {
.serverPort(writeConfig.getEmbeddedTimelineServerPort())
.numThreads(writeConfig.getEmbeddedTimelineServerThreads())
.compress(writeConfig.getEmbeddedTimelineServerCompressOutput())
- .async(writeConfig.getEmbeddedTimelineServerUseAsync())
- .refreshTimelineBasedOnLatestCommit(writeConfig.isRefreshTimelineServerBasedOnLatestCommit());
+ .async(writeConfig.getEmbeddedTimelineServerUseAsync());
// Only passing marker-related write configs to timeline server
// if timeline-server-based markers are used.
if (writeConfig.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED) {
.withRemoteServerHost(hostAddr)
.withRemoteServerPort(serverPort)
.withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
+ .withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled())
+ .withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers())
+ .withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs())
+ .withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs())
+ .withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions())
.build();
}
}
}
+ public LockManager getLockManager() {
+ return lockManager;
+ }
+
public Option<HoodieInstant> getLastCompletedTransactionOwner() {
return lastCompletedTxnOwnerInstant;
}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+
+/**
+ * A FileSystem based lock. This {@link LockProvider} implementation allows to lock table operations
+ * using DFS. Users might need to manually clean the Locker's path if writeClient crash and never run again.
+ * NOTE: This only works for DFS with atomic create/delete operation
+ */
+public class FileSystemBasedLockProvider implements LockProvider<String>, Serializable {
+
+ private static final Logger LOG = LogManager.getLogger(FileSystemBasedLockProvider.class);
+
+ private static final String LOCK_FILE_NAME = "lock";
+
+ private final int lockTimeoutMinutes;
+ private final transient FileSystem fs;
+ private final transient Path lockFile;
+ protected LockConfiguration lockConfiguration;
+
+ public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null);
+ if (StringUtils.isNullOrEmpty(lockDirectory)) {
+ lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key())
+ + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
+ }
+ this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
+ this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME);
+ this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
+ }
+
+ @Override
+ public void close() {
+ synchronized (LOCK_FILE_NAME) {
+ try {
+ fs.delete(this.lockFile, true);
+ } catch (IOException e) {
+ throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
+ }
+ }
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ try {
+ synchronized (LOCK_FILE_NAME) {
+ // Check whether lock is already expired, if so try to delete lock file
+ if (fs.exists(this.lockFile)) {
+ if (checkIfExpired()) {
+ fs.delete(this.lockFile, true);
+ LOG.warn("Delete expired lock file: " + this.lockFile);
+ } else {
+ return false;
+ }
+ }
+ acquireLock();
+ return fs.exists(this.lockFile);
+ }
+ } catch (IOException | HoodieIOException e) {
+ LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
+ return false;
+ }
+ }
+
+ @Override
+ public void unlock() {
+ synchronized (LOCK_FILE_NAME) {
+ try {
+ if (fs.exists(this.lockFile)) {
+ fs.delete(this.lockFile, true);
+ }
+ } catch (IOException io) {
+ throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
+ }
+ }
+ }
+
+ @Override
+ public String getLock() {
+ return this.lockFile.toString();
+ }
+
+ private boolean checkIfExpired() {
+ if (lockTimeoutMinutes == 0) {
+ return false;
+ }
+ try {
+ long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime();
+ if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000L) {
+ return true;
+ }
+ } catch (IOException | HoodieIOException e) {
+ LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get lockFile's modification time", e);
+ }
+ return false;
+ }
+
+ private void acquireLock() {
+ try {
+ fs.create(this.lockFile, false).close();
+ } catch (IOException e) {
+ throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
+ }
+ }
+
+ protected String generateLogStatement(LockState state) {
+ return StringUtils.join(state.name(), " lock at: ", getLock());
+ }
+
+ private void checkRequiredProps(final LockConfiguration config) {
+ ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null) != null
+ || config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) != null);
+ ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0);
+ }
+}
if (retryCount >= maxRetries) {
throw new HoodieLockException("Unable to acquire lock, lock object ", e);
}
+ try {
+ Thread.sleep(maxWaitTimeInMs);
+ } catch (InterruptedException ex) {
+ // ignore InterruptedException here
+ }
} finally {
retryCount++;
}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Archival related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "Archival Configs",
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "Configurations that control archival.")
+public class HoodieArchivalConfig extends HoodieConfig {
+
+ public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
+ .key("hoodie.archive.automatic")
+ .defaultValue("true")
+ .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+ + " to archive commits if we cross a maximum value of commits."
+ + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+ public static final ConfigProperty<String> ASYNC_ARCHIVE = ConfigProperty
+ .key("hoodie.archive.async")
+ .defaultValue("false")
+ .sinceVersion("0.11.0")
+ .withDocumentation("Only applies when " + AUTO_ARCHIVE.key() + " is turned on. "
+ + "When turned on runs archiver async with writing, which can speed up overall write performance.");
+
+ public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP = ConfigProperty
+ .key("hoodie.keep.max.commits")
+ .defaultValue("30")
+ .withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to "
+ + " keep the metadata overhead constant, even as the table size grows."
+ + "This config controls the maximum number of instants to retain in the active timeline. ");
+
+ public static final ConfigProperty<Integer> DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
+ .key("hoodie.archive.delete.parallelism")
+ .defaultValue(100)
+ .withDocumentation("Parallelism for deleting archived hoodie commits.");
+
+ public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
+ .key("hoodie.keep.min.commits")
+ .defaultValue("20")
+ .withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP.key() + ", but controls the minimum number of"
+ + "instants to retain in the active timeline.");
+
+ public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE = ConfigProperty
+ .key("hoodie.commits.archival.batch")
+ .defaultValue(String.valueOf(10))
+ .withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
+ + " archive log. This config controls such archival batch size.");
+
+ public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
+ .key("hoodie.archive.merge.files.batch.size")
+ .defaultValue(10)
+ .withDocumentation("The number of small archive files to be merged at once.");
+
+ public static final ConfigProperty<Long> ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+ .key("hoodie.archive.merge.small.file.limit.bytes")
+ .defaultValue(20L * 1024 * 1024)
+ .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");
+
+ public static final ConfigProperty<Boolean> ARCHIVE_MERGE_ENABLE = ConfigProperty
+ .key("hoodie.archive.merge.enable")
+ .defaultValue(false)
+ .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
+ + " useful when storage scheme doesn't support append operation.");
+
+ public static final ConfigProperty<Boolean> ARCHIVE_BEYOND_SAVEPOINT = ConfigProperty
+ .key("hoodie.archive.beyond.savepoint")
+ .defaultValue(false)
+ .sinceVersion("0.12.0")
+ .withDocumentation("If enabled, archival will proceed beyond savepoint, skipping savepoint commits. "
+ + "If disabled, archival will stop at the earliest savepoint commit.");
+
+ /**
+ * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
+ */
+ @Deprecated
+ public static final String MAX_COMMITS_TO_KEEP_PROP = MAX_COMMITS_TO_KEEP.key();
+ /**
+ * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
+ */
+ @Deprecated
+ public static final String MIN_COMMITS_TO_KEEP_PROP = MIN_COMMITS_TO_KEEP.key();
+ /**
+ * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
+ */
+ @Deprecated
+ public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key();
+ /**
+ * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
+ */
+ @Deprecated
+ private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue();
+ /**
+ * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
+ */
+ @Deprecated
+ private static final String DEFAULT_MIN_COMMITS_TO_KEEP = MIN_COMMITS_TO_KEEP.defaultValue();
+ /**
+ * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
+ */
+ @Deprecated
+ private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = COMMITS_ARCHIVAL_BATCH_SIZE.defaultValue();
+
+ private HoodieArchivalConfig() {
+ super();
+ }
+
+ public static HoodieArchivalConfig.Builder newBuilder() {
+ return new HoodieArchivalConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final HoodieArchivalConfig archivalConfig = new HoodieArchivalConfig();
+
+ public HoodieArchivalConfig.Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.archivalConfig.getProps().load(reader);
+ return this;
+ }
+ }
+
+ public HoodieArchivalConfig.Builder fromProperties(Properties props) {
+ this.archivalConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withAutoArchive(Boolean autoArchive) {
+ archivalConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withAsyncArchive(Boolean asyncArchive) {
+ archivalConfig.setValue(ASYNC_ARCHIVE, String.valueOf(asyncArchive));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
+ archivalConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
+ archivalConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withArchiveMergeFilesBatchSize(int number) {
+ archivalConfig.setValue(ARCHIVE_MERGE_FILES_BATCH_SIZE, String.valueOf(number));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withArchiveMergeSmallFileLimit(long size) {
+ archivalConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withArchiveMergeEnable(boolean enable) {
+ archivalConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
+ archivalConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withCommitsArchivalBatchSize(int batchSize) {
+ archivalConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
+ return this;
+ }
+
+ public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
+ archivalConfig.setValue(ARCHIVE_BEYOND_SAVEPOINT, String.valueOf(archiveBeyondSavepoint));
+ return this;
+ }
+
+ public HoodieArchivalConfig build() {
+ archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
+ return archivalConfig;
+ }
+ }
+}