[CARBONDATA-3236] Fix for JVM Crash for insert into new table from old table
authormanishnalla1994 <manish.nalla1994@gmail.com>
Tue, 8 Jan 2019 10:42:55 +0000 (16:12 +0530)
committerkumarvishal09 <kumarvishal1802@gmail.com>
Wed, 9 Jan 2019 11:38:51 +0000 (17:08 +0530)
Problem: Insert into new table from old table fails with JVM crash for file format(Using carbondata).
This happened because both the query and load flow were assigned the same taskId and once query finished
it freed the unsafe memory while the insert still in progress.

Solution: As the flow for file format is direct flow and uses on-heap(safe) so no need to free the unsafe memory in query.

This closes #3056

integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala

index 8cb2ca4..f725de3 100644 (file)
@@ -410,15 +410,8 @@ class SparkCarbonFileFormat extends FileFormat
         val model = format.createQueryModel(split, hadoopAttemptContext)
         model.setConverter(new SparkDataTypeConverterImpl)
         model.setPreFetchData(false)
-        var isAdded = false
-        Option(TaskContext.get()).foreach { context =>
-          val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks")
-          onCompleteCallbacksField.setAccessible(true)
-          val listeners = onCompleteCallbacksField.get(context)
-            .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
-          isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener])
-          model.setFreeUnsafeMemory(!isAdded)
-        }
+        // As file format uses on heap, no need to free unsafe memory
+        model.setFreeUnsafeMemory(false)
         val carbonReader = if (readVector) {
           model.setDirectVectorFill(true)
           val vectorizedReader = new VectorizedCarbonRecordReader(model,
@@ -439,7 +432,7 @@ class SparkCarbonFileFormat extends FileFormat
         Option(TaskContext.get()).foreach{context =>
           context.addTaskCompletionListener(
           CarbonQueryTaskCompletionListenerImpl(
-            iter.asInstanceOf[RecordReaderIterator[InternalRow]], !isAdded))
+            iter.asInstanceOf[RecordReaderIterator[InternalRow]]))
         }
 
         if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) {
index eb3e42a..5547228 100644 (file)
@@ -40,7 +40,7 @@ trait CarbonQueryTaskCompletionListener extends TaskCompletionListener
 trait CarbonLoadTaskCompletionListener extends TaskCompletionListener
 
 case class CarbonQueryTaskCompletionListenerImpl(iter: RecordReaderIterator[InternalRow],
-    freeMemory: Boolean) extends CarbonQueryTaskCompletionListener {
+    freeMemory: Boolean = false) extends CarbonQueryTaskCompletionListener {
   override def onTaskCompletion(context: TaskContext): Unit = {
     if (iter != null) {
       try {