[MINOR] improve flink dummySink's parallelism (#6325)
[hudi.git] / hudi-spark-datasource / hudi-spark-common / src / main / scala / org / apache / hudi / BaseFileOnlyRelation.scala
index 4160c34b0ce6494325f93f6660f45761f3318e7a..119c61f84bc19d077f044ec27643c78ab348334b 100644 (file)
@@ -20,14 +20,14 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.common.model.HoodieFileFormat
+import org.apache.hudi.HoodieBaseRelation.projectReader
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
-import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
 
@@ -52,6 +52,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
                            globPaths: Seq[Path])
   extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
 
+  case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit
+
   override type FileSplit = HoodieBaseFileSplit
 
   // TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract
@@ -59,14 +61,10 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
   //                 For more details please check HUDI-4161
   // NOTE: This override has to mirror semantic of whenever this Relation is converted into [[HadoopFsRelation]],
   //       which is currently done for all cases, except when Schema Evolution is enabled
-  override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
-    val enableSchemaOnRead = !internalSchema.isEmptySchema
-    !enableSchemaOnRead
-  }
+  override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean =
+    internalSchemaOpt.isEmpty
 
-  override lazy val mandatoryFields: Seq[String] =
-  // TODO reconcile, record's key shouldn't be mandatory for base-file only relation
-    Seq(recordKeyField)
+  override lazy val mandatoryFields: Seq[String] = Seq.empty
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
     super.imbueConfigs(sqlContext)
@@ -74,24 +72,36 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
   }
 
   protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
-                                    partitionSchema: StructType,
-                                    dataSchema: HoodieTableSchema,
+                                    tableSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
-                                    filters: Array[Filter]): HoodieUnsafeRDD = {
+                                    requestedColumns: Array[String],
+                                    filters: Array[Filter]): RDD[InternalRow] = {
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
 
     val baseFileReader = createBaseFileReader(
       spark = sparkSession,
       partitionSchema = partitionSchema,
       dataSchema = dataSchema,
-      requiredSchema = requiredSchema,
+      requiredDataSchema = requiredDataSchema,
       filters = filters,
       options = optParams,
       // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
       //       to configure Parquet reader appropriately
-      hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
+      hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
     )
 
-    new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
+    // NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller.
+    //       This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the
+    //       data file, but instead would be parsed from the partition path. In that case output of the file-reader will have
+    //       different ordering of the fields than the original required schema (for more details please check out
+    //       [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema
+    //       back into the one expected by the caller
+    val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema)
+
+    // SPARK-37273 FileScanRDD constructor changed in SPARK 3.3
+    sparkAdapter.createHoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits.map(_.filePartition), requiredSchema.structTypeSchema)
+      .asInstanceOf[HoodieUnsafeRDD]
   }
 
   protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
@@ -124,16 +134,6 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
-      val (tableFileFormat, formatClassName) =
-        metaClient.getTableConfig.getBaseFileFormat match {
-          case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
-          case HoodieFileFormat.PARQUET =>
-            // We're delegating to Spark to append partition values to every row only in cases
-            // when these corresponding partition-values are not persisted w/in the data file itself
-            val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
-            (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
-        }
-
     if (globPaths.isEmpty) {
       // NOTE: There are currently 2 ways partition values could be fetched:
       //          - Source columns (producing the values used for physical partitioning) will be read
@@ -157,24 +157,44 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
         partitionSchema = partitionSchema,
         dataSchema = dataSchema,
         bucketSpec = None,
-        fileFormat = tableFileFormat,
+        fileFormat = fileFormat,
         optParams)(sparkSession)
     } else {
       val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
       val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
 
+      // NOTE: Spark is able to infer partitioning values from partition path only when Hive-style partitioning
+      //       scheme is used. Therefore, we fallback to reading the table as non-partitioned (specifying
+      //       partitionColumns = Seq.empty) whenever Hive-style partitioning is not involved
+      val partitionColumns: Seq[String] = if (tableConfig.getHiveStylePartitioningEnable.toBoolean) {
+        this.partitionColumns
+      } else {
+        Seq.empty
+      }
+
       DataSource.apply(
         sparkSession = sparkSession,
         paths = extraReadPaths,
-        userSpecifiedSchema = userSchema,
-        className = formatClassName,
-        // Since we're reading the table as just collection of files we have to make sure
-        // we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
-        // while keeping previous versions of the files around as well.
-        //
-        // We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
+        // Here we should specify the schema to the latest commit schema since
+        // the table schema evolution.
+        userSpecifiedSchema = userSchema.orElse(Some(tableStructSchema)),
+        className = fileFormatClassName,
         options = optParams ++ Map(
-          "mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName
+          // Since we're reading the table as just collection of files we have to make sure
+          // we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
+          // while keeping previous versions of the files around as well.
+          //
+          // We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
+          "mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName,
+
+          // We have to override [[EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH]] setting, since
+          // the relation might have this setting overridden
+          DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key -> shouldExtractPartitionValuesFromPartitionPath.toString,
+
+          // NOTE: We have to specify table's base-path explicitly, since we're requesting Spark to read it as a
+          //       list of globbed paths which complicates partitioning discovery for Spark.
+          //       Please check [[PartitioningAwareFileIndex#basePaths]] comment for more details.
+          PartitioningAwareFileIndex.BASE_PATH_PARAM -> metaClient.getBasePathV2.toString
         ),
         partitionColumns = partitionColumns
       )