[BAHIR-233] Add SNS message support for SQS streaming source (#97)
[bahir.git] / sql-streaming-sqs / src / main / scala / org / apache / spark / sql / streaming / sqs / SqsSourceOptions.scala
index a4c0cc1ae67afdec7af8de7fa69e92de5125137b..0c2eda0513fbfd15c53c874dcca248dd7ca3431b 100644 (file)
@@ -28,6 +28,15 @@ import org.apache.spark.util.Utils
  */
 class SqsSourceOptions(parameters: CaseInsensitiveMap[String]) extends Logging {
 
+  object S3MessageWrapper extends Enumeration {
+    type MessageFormat = Value
+    val None, SNS = Value
+
+    def withNameOpt(opt: String): Option[Value] = {
+      values.find(_.toString.toLowerCase == opt.toLowerCase)
+    }
+  }
+
   def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
 
   val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
@@ -92,6 +101,13 @@ class SqsSourceOptions(parameters: CaseInsensitiveMap[String]) extends Logging {
     throw new IllegalArgumentException("Specifying file format is mandatory with sqs source")
   }
 
+  val messageWrapper: S3MessageWrapper.Value = parameters.get("messageWrapper").map( str =>
+    S3MessageWrapper.withNameOpt(str).getOrElse({
+      throw new IllegalArgumentException(s"Invalid value '$str' for option 'messageWrapper', " +
+        s"must be one of [${S3MessageWrapper.values.mkString(", ")}]")
+    })
+  ).getOrElse(S3MessageWrapper.None)
+
   val ignoreFileDeletion: Boolean = withBooleanParameter("ignoreFileDeletion", false)
 
    /**