[SPARK-26580][SQL] remove Scala 2.11 hack for Scala UDF
authorWenchen Fan <wenchen@databricks.com>
Fri, 11 Jan 2019 06:52:13 +0000 (14:52 +0800)
committerWenchen Fan <wenchen@databricks.com>
Fri, 11 Jan 2019 06:52:13 +0000 (14:52 +0800)
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/22732 , we tried our best to keep the behavior of Scala UDF unchanged in Spark 2.4.

However, since Spark 3.0, Scala 2.12 is the default. The trick that was used to keep the behavior unchanged doesn't work with Scala 2.12.

This PR proposes to remove the Scala 2.11 hack, as it's not useful.

## How was this patch tested?

existing tests.

Closes #23498 from cloud-fan/udf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
docs/sql-migration-guide-upgrade.md
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
sql/core/src/main/scala/org/apache/spark/sql/functions.scala
sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala

index 4e36fd4..a2d782e 100644 (file)
@@ -43,6 +43,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring.
 
+  - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default.
+
 ## Upgrading From Spark SQL 2.3 to 2.4
 
   - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
index c8542d0..1b06835 100644 (file)
@@ -958,23 +958,6 @@ trait ScalaReflection extends Logging {
   }
 
   /**
-   * Returns the nullability of the input parameter types of the scala function object.
-   *
-   * Note that this only works with Scala 2.11, and the information returned may be inaccurate if
-   * used with a different Scala version.
-   */
-  def getParameterTypeNullability(func: AnyRef): Seq[Boolean] = {
-    if (!Properties.versionString.contains("2.11")) {
-      logWarning(s"Scala ${Properties.versionString} cannot get type nullability correctly via " +
-        "reflection, thus Spark cannot add proper input null check for UDF. To avoid this " +
-        "problem, use the typed UDF interfaces instead.")
-    }
-    val methods = func.getClass.getMethods.filter(m => m.getName == "apply" && !m.isBridge)
-    assert(methods.length == 1)
-    methods.head.getParameterTypes.map(!_.isPrimitive)
-  }
-
-  /**
    * Returns the parameter names and types for the primary constructor of this type.
    *
    * Note that it only works for scala classes with primary constructor, and currently doesn't
index fae1119..c9e0a2e 100644 (file)
@@ -54,18 +54,6 @@ case class ScalaUDF(
     udfDeterministic: Boolean = true)
   extends Expression with NonSQLExpression with UserDefinedExpression {
 
-  // The constructor for SPARK 2.1 and 2.2
-  def this(
-      function: AnyRef,
-      dataType: DataType,
-      children: Seq[Expression],
-      inputTypes: Seq[DataType],
-      udfName: Option[String]) = {
-    this(
-      function, dataType, children, ScalaReflection.getParameterTypeNullability(function),
-      inputTypes, udfName, nullable = true, udfDeterministic = true)
-  }
-
   override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)
 
   override def toString: String =
index 1b2d6c7..4d8e1c5 100644 (file)
@@ -102,17 +102,7 @@ private[sql] case class SparkUserDefinedFunction(
     // It's possible that some of the inputs don't have a specific type(e.g. `Any`),  skip type
     // check and null check for them.
     val inputTypes = inputSchemas.map(_.map(_.dataType).getOrElse(AnyDataType))
-
-    val inputsNullSafe = if (inputSchemas.isEmpty) {
-      // This is for backward compatibility of `functions.udf(AnyRef, DataType)`. We need to
-      // do reflection of the lambda function object and see if its arguments are nullable or not.
-      // This doesn't work for Scala 2.12 and we should consider removing this workaround, as Spark
-      // uses Scala 2.12 by default since 3.0.
-      ScalaReflection.getParameterTypeNullability(f)
-    } else {
-      inputSchemas.map(_.map(_.nullable).getOrElse(true))
-    }
-
+    val inputsNullSafe = inputSchemas.map(_.map(_.nullable).getOrElse(true))
     ScalaUDF(
       f,
       dataType,
index 7572cf2..1199cd8 100644 (file)
@@ -4250,6 +4250,13 @@ object functions {
    * By default the returned UDF is deterministic. To change it to nondeterministic, call the
    * API `UserDefinedFunction.asNondeterministic()`.
    *
+   * Note that, although the Scala closure can have primitive-type function argument, it doesn't
+   * work well with null values. Because the Scala closure is passed in as Any type, there is no
+   * type information for the function arguments. Without the type information, Spark may blindly
+   * pass null to the Scala closure with primitive-type argument, and the closure will see the
+   * default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`,
+   * the result is 0 for null input.
+   *
    * @param f  A closure in Scala
    * @param dataType  The output data type of the UDF
    *
index 06b9343..5ac2093 100644 (file)
@@ -423,6 +423,19 @@ class UDFSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("SPARK-25044 Verify null input handling for primitive types - with udf(Any, DataType)") {
+    val f = udf((x: Int) => x, IntegerType)
+    checkAnswer(
+      Seq(new Integer(1), null).toDF("x").select(f($"x")),
+      Row(1) :: Row(0) :: Nil)
+
+    val f2 = udf((x: Double) => x, DoubleType)
+    checkAnswer(
+      Seq(new java.lang.Double(1.1), null).toDF("x").select(f2($"x")),
+      Row(1.1) :: Row(0.0) :: Nil)
+
+  }
+
   test("SPARK-26308: udf with decimal") {
     val df1 = spark.createDataFrame(
       sparkContext.parallelize(Seq(Row(new BigDecimal("2011000000000002456556")))),