[SPARK-26503][CORE] Get rid of spark.sql.legacy.timeParser.enabled
authorSean Owen <sean.owen@databricks.com>
Fri, 11 Jan 2019 14:53:12 +0000 (08:53 -0600)
committerSean Owen <sean.owen@databricks.com>
Fri, 11 Jan 2019 14:53:12 +0000 (08:53 -0600)
## What changes were proposed in this pull request?

Per discussion in #23391 (comment) this proposes to just remove the old pre-Spark-3 time parsing behavior.

This is a rebase of https://github.com/apache/spark/pull/23411

## How was this patch tested?

Existing tests.

Closes #23495 from srowen/SPARK-26503.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala

index db92552..c47b087 100644 (file)
@@ -20,12 +20,6 @@ package org.apache.spark.sql.catalyst.util
 import java.time.{Instant, ZoneId}
 import java.util.Locale
 
-import scala.util.Try
-
-import org.apache.commons.lang3.time.FastDateFormat
-
-import org.apache.spark.sql.internal.SQLConf
-
 sealed trait DateFormatter extends Serializable {
   def parse(s: String): Int // returns days since epoch
   def format(days: Int): String
@@ -56,43 +50,8 @@ class Iso8601DateFormatter(
   }
 }
 
-class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
-  @transient
-  private lazy val format = FastDateFormat.getInstance(pattern, locale)
-
-  override def parse(s: String): Int = {
-    val milliseconds = format.parse(s).getTime
-    DateTimeUtils.millisToDays(milliseconds)
-  }
-
-  override def format(days: Int): String = {
-    val date = DateTimeUtils.toJavaDate(days)
-    format.format(date)
-  }
-}
-
-class LegacyFallbackDateFormatter(
-    pattern: String,
-    locale: Locale) extends LegacyDateFormatter(pattern, locale) {
-  override def parse(s: String): Int = {
-    Try(super.parse(s)).orElse {
-      // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
-      // compatibility.
-      Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime))
-    }.getOrElse {
-      // In Spark 1.5.0, we store the data as number of days since epoch in string.
-      // So, we just convert it to Int.
-      s.toInt
-    }
-  }
-}
-
 object DateFormatter {
   def apply(format: String, locale: Locale): DateFormatter = {
-    if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyFallbackDateFormatter(format, locale)
-    } else {
-      new Iso8601DateFormatter(format, locale)
-    }
+    new Iso8601DateFormatter(format, locale)
   }
 }
index 8042099..10c73b2 100644 (file)
@@ -23,12 +23,6 @@ import java.time.format.DateTimeParseException
 import java.time.temporal.TemporalQueries
 import java.util.{Locale, TimeZone}
 
-import scala.util.Try
-
-import org.apache.commons.lang3.time.FastDateFormat
-
-import org.apache.spark.sql.internal.SQLConf
-
 sealed trait TimestampFormatter extends Serializable {
   /**
    * Parses a timestamp in a string and converts it to microseconds.
@@ -79,37 +73,8 @@ class Iso8601TimestampFormatter(
   }
 }
 
-class LegacyTimestampFormatter(
-    pattern: String,
-    timeZone: TimeZone,
-    locale: Locale) extends TimestampFormatter {
-  @transient
-  private lazy val format = FastDateFormat.getInstance(pattern, timeZone, locale)
-
-  protected def toMillis(s: String): Long = format.parse(s).getTime
-
-  override def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS
-
-  override def format(us: Long): String = {
-    format.format(DateTimeUtils.toJavaTimestamp(us))
-  }
-}
-
-class LegacyFallbackTimestampFormatter(
-    pattern: String,
-    timeZone: TimeZone,
-    locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) {
-  override def toMillis(s: String): Long = {
-    Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
-  }
-}
-
 object TimestampFormatter {
   def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
-    if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyFallbackTimestampFormatter(format, timeZone, locale)
-    } else {
-      new Iso8601TimestampFormatter(format, timeZone, locale)
-    }
+    new Iso8601TimestampFormatter(format, timeZone, locale)
   }
 }
index 9804af7..c1b885a 100644 (file)
@@ -1625,13 +1625,6 @@ object SQLConf {
         "a SparkConf entry.")
       .booleanConf
       .createWithDefault(true)
-
-  val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled")
-    .doc("When set to true, java.text.SimpleDateFormat is used for formatting and parsing " +
-      " dates/timestamps in a locale-sensitive manner. When set to false, classes from " +
-      "java.time.* packages are used for the same purpose.")
-    .booleanConf
-    .createWithDefault(false)
 }
 
 /**
@@ -2057,8 +2050,6 @@ class SQLConf extends Serializable with Logging {
   def setCommandRejectsSparkCoreConfs: Boolean =
     getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS)
 
-  def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED)
-
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
index 9a6f4f5..8ce45f0 100644 (file)
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonFactory
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.plans.SQLHelper
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {
@@ -43,61 +42,45 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {
   }
 
   test("inferring timestamp type") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
-        checkTimestampType("yyyy", """{"a": "2018"}""")
-        checkTimestampType("yyyy=MM", """{"a": "2018=12"}""")
-        checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""")
-        checkTimestampType(
-          "yyyy-MM-dd'T'HH:mm:ss.SSS",
-          """{"a": "2018-12-02T21:04:00.123"}""")
-        checkTimestampType(
-          "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
-          """{"a": "2018-12-02T21:04:00.123567+01:00"}""")
-      }
-    }
+    checkTimestampType("yyyy", """{"a": "2018"}""")
+    checkTimestampType("yyyy=MM", """{"a": "2018=12"}""")
+    checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""")
+    checkTimestampType(
+      "yyyy-MM-dd'T'HH:mm:ss.SSS",
+      """{"a": "2018-12-02T21:04:00.123"}""")
+    checkTimestampType(
+      "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
+      """{"a": "2018-12-02T21:04:00.123567+01:00"}""")
   }
 
   test("prefer decimals over timestamps") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
-        checkType(
-          options = Map(
-            "prefersDecimal" -> "true",
-            "timestampFormat" -> "yyyyMMdd.HHmmssSSS"
-          ),
-          json = """{"a": "20181202.210400123"}""",
-          dt = DecimalType(17, 9)
-        )
-      }
-    }
+    checkType(
+      options = Map(
+        "prefersDecimal" -> "true",
+        "timestampFormat" -> "yyyyMMdd.HHmmssSSS"
+      ),
+      json = """{"a": "20181202.210400123"}""",
+      dt = DecimalType(17, 9)
+    )
   }
 
   test("skip decimal type inferring") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
-        checkType(
-          options = Map(
-            "prefersDecimal" -> "false",
-            "timestampFormat" -> "yyyyMMdd.HHmmssSSS"
-          ),
-          json = """{"a": "20181202.210400123"}""",
-          dt = TimestampType
-        )
-      }
-    }
+    checkType(
+      options = Map(
+        "prefersDecimal" -> "false",
+        "timestampFormat" -> "yyyyMMdd.HHmmssSSS"
+      ),
+      json = """{"a": "20181202.210400123"}""",
+      dt = TimestampType
+    )
   }
 
   test("fallback to string type") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
-        checkType(
-          options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"),
-          json = """{"a": "20181202.210400123"}""",
-          dt = StringType
-        )
-      }
-    }
+    checkType(
+      options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"),
+      json = """{"a": "20181202.210400123"}""",
+      dt = StringType
+    )
   }
 
   test("disable timestamp inferring") {
index 8f575a3..78debb5 100644 (file)
@@ -1451,109 +1451,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     })
   }
 
-  test("backward compatibility") {
-    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
-      // This test we make sure our JSON support can read JSON data generated by previous version
-      // of Spark generated through toJSON method and JSON data source.
-      // The data is generated by the following program.
-      // Here are a few notes:
-      //  - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
-      //      in the JSON object.
-      //  - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
-      //      JSON objects generated by those Spark versions (col17).
-      //  - If the type is NullType, we do not write data out.
-
-      // Create the schema.
-      val struct =
-        StructType(
-          StructField("f1", FloatType, true) ::
-            StructField("f2", ArrayType(BooleanType), true) :: Nil)
-
-      val dataTypes =
-        Seq(
-          StringType, BinaryType, NullType, BooleanType,
-          ByteType, ShortType, IntegerType, LongType,
-          FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
-          DateType, TimestampType,
-          ArrayType(IntegerType), MapType(StringType, LongType), struct,
-          new TestUDT.MyDenseVectorUDT())
-      val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
-        StructField(s"col$index", dataType, nullable = true)
-      }
-      val schema = StructType(fields)
-
-      val constantValues =
-        Seq(
-          "a string in binary".getBytes(StandardCharsets.UTF_8),
-          null,
-          true,
-          1.toByte,
-          2.toShort,
-          3,
-          Long.MaxValue,
-          0.25.toFloat,
-          0.75,
-          new java.math.BigDecimal(s"1234.23456"),
-          new java.math.BigDecimal(s"1.23456"),
-          java.sql.Date.valueOf("2015-01-01"),
-          java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
-          Seq(2, 3, 4),
-          Map("a string" -> 2000L),
-          Row(4.75.toFloat, Seq(false, true)),
-          new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
-      val data =
-        Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
-
-      // Data generated by previous versions.
-      // scalastyle:off
-      val existingJSONData =
-      """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-        """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-        """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-        """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-        """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-        """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-        """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
-      // scalastyle:on
-
-      // Generate data for the current version.
-      val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
-      withTempPath { path =>
-        df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
-
-        // df.toJSON will convert internal rows to external rows first and then generate
-        // JSON objects. While, df.write.format("json") will write internal rows directly.
-        val allJSON =
-        existingJSONData ++
-          df.toJSON.collect() ++
-          sparkContext.textFile(path.getCanonicalPath).collect()
-
-        Utils.deleteRecursively(path)
-        sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
-
-        // Read data back with the schema specified.
-        val col0Values =
-          Seq(
-            "Spark 1.2.2",
-            "Spark 1.3.1",
-            "Spark 1.3.1",
-            "Spark 1.4.1",
-            "Spark 1.4.1",
-            "Spark 1.5.0",
-            "Spark 1.5.0",
-            "Spark " + spark.sparkContext.version,
-            "Spark " + spark.sparkContext.version)
-        val expectedResult = col0Values.map { v =>
-          Row.fromSeq(Seq(v) ++ constantValues)
-        }
-        checkAnswer(
-          spark.read.format("json").schema(schema).load(path.getCanonicalPath),
-          expectedResult
-        )
-      }
-    }
-  }
-
   test("SPARK-11544 test pathfilter") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath
@@ -2592,53 +2489,45 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("inferring timestamp type") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
-        def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema
-
-        assert(schemaOf(
-          """{"a":"2018-12-17T10:11:12.123-01:00"}""",
-          """{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp"))
-
-        assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""")
-          === fromDDL("a string"))
-        assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""")
-          === fromDDL("a string"))
-
-        assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""")
-          === fromDDL("a timestamp"))
-        assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""")
-          === fromDDL("a timestamp"))
-      }
-    }
+    def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema
+
+    assert(schemaOf(
+      """{"a":"2018-12-17T10:11:12.123-01:00"}""",
+      """{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp"))
+
+    assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""")
+      === fromDDL("a string"))
+    assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""")
+      === fromDDL("a string"))
+
+    assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""")
+      === fromDDL("a timestamp"))
+    assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""")
+      === fromDDL("a timestamp"))
   }
 
   test("roundtrip for timestamp type inferring") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
-        val customSchema = new StructType().add("date", TimestampType)
-        withTempDir { dir =>
-          val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
-          val timestampsWithFormat = spark.read
-            .option("timestampFormat", "dd/MM/yyyy HH:mm")
-            .json(datesRecords)
-          assert(timestampsWithFormat.schema === customSchema)
-
-          timestampsWithFormat.write
-            .format("json")
-            .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
-            .option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
-            .save(timestampsWithFormatPath)
-
-          val readBack = spark.read
-            .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
-            .option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
-            .json(timestampsWithFormatPath)
-
-          assert(readBack.schema === customSchema)
-          checkAnswer(readBack, timestampsWithFormat)
-        }
-      }
+    val customSchema = new StructType().add("date", TimestampType)
+    withTempDir { dir =>
+      val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+      val timestampsWithFormat = spark.read
+        .option("timestampFormat", "dd/MM/yyyy HH:mm")
+        .json(datesRecords)
+      assert(timestampsWithFormat.schema === customSchema)
+
+      timestampsWithFormat.write
+        .format("json")
+        .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
+        .option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
+        .save(timestampsWithFormatPath)
+
+      val readBack = spark.read
+        .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
+        .option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
+        .json(timestampsWithFormatPath)
+
+      assert(readBack.schema === customSchema)
+      checkAnswer(readBack, timestampsWithFormat)
     }
   }
 }
index 57b8966..bf6d0ea 100644 (file)
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.sources
 
 import java.io.File
-import java.util.TimeZone
 
 import scala.util.Random
 
@@ -126,61 +125,59 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
     } else {
       Seq(false)
     }
-    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "false") {
-      for (dataType <- supportedDataTypes) {
-        for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
-          val extraMessage = if (isParquetDataSource) {
-            s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
-          } else {
-            ""
-          }
-          logInfo(s"Testing $dataType data type$extraMessage")
-
-          val extraOptions = Map[String, String](
-            "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString,
-            "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX"
-          )
-
-          withTempPath { file =>
-            val path = file.getCanonicalPath
-
-            val seed = System.nanoTime()
-            withClue(s"Random data generated with the seed: ${seed}") {
-              val dataGenerator = RandomDataGenerator.forType(
-                dataType = dataType,
-                nullable = true,
-                new Random(seed)
-              ).getOrElse {
-                fail(s"Failed to create data generator for schema $dataType")
-              }
-
-              // Create a DF for the schema with random data. The index field is used to sort the
-              // DataFrame.  This is a workaround for SPARK-10591.
-              val schema = new StructType()
-                .add("index", IntegerType, nullable = false)
-                .add("col", dataType, nullable = true)
-              val rdd =
-                spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
-              val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
-              df.write
-                .mode("overwrite")
-                .format(dataSourceName)
-                .option("dataSchema", df.schema.json)
-                .options(extraOptions)
-                .save(path)
-
-              val loadedDF = spark
-                .read
-                .format(dataSourceName)
-                .option("dataSchema", df.schema.json)
-                .schema(df.schema)
-                .options(extraOptions)
-                .load(path)
-                .orderBy("index")
-
-              checkAnswer(loadedDF, df)
+    for (dataType <- supportedDataTypes) {
+      for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
+        val extraMessage = if (isParquetDataSource) {
+          s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
+        } else {
+          ""
+        }
+        logInfo(s"Testing $dataType data type$extraMessage")
+
+        val extraOptions = Map[String, String](
+          "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString,
+          "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX"
+        )
+
+        withTempPath { file =>
+          val path = file.getCanonicalPath
+
+          val seed = System.nanoTime()
+          withClue(s"Random data generated with the seed: ${seed}") {
+            val dataGenerator = RandomDataGenerator.forType(
+              dataType = dataType,
+              nullable = true,
+              new Random(seed)
+            ).getOrElse {
+              fail(s"Failed to create data generator for schema $dataType")
             }
+
+            // Create a DF for the schema with random data. The index field is used to sort the
+            // DataFrame.  This is a workaround for SPARK-10591.
+            val schema = new StructType()
+              .add("index", IntegerType, nullable = false)
+              .add("col", dataType, nullable = true)
+            val rdd =
+              spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+            val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+            df.write
+              .mode("overwrite")
+              .format(dataSourceName)
+              .option("dataSchema", df.schema.json)
+              .options(extraOptions)
+              .save(path)
+
+            val loadedDF = spark
+              .read
+              .format(dataSourceName)
+              .option("dataSchema", df.schema.json)
+              .schema(df.schema)
+              .options(extraOptions)
+              .load(path)
+              .orderBy("index")
+
+            checkAnswer(loadedDF, df)
           }
         }
       }