[CARBONDATA-3265] Fixed memory leak in Range Sort
authorshivamasn <shivamasn17@gmail.com>
Tue, 22 Jan 2019 09:58:10 +0000 (15:28 +0530)
committerkunal642 <kunalkapoor642@gmail.com>
Thu, 24 Jan 2019 11:44:38 +0000 (17:14 +0530)
In range sort, unsafe memory was not getting cleared in case of task failure.
So, added a fix for memory leak.

This closes #3095

integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala

index a5d354a..77d0d84 100644 (file)
@@ -20,7 +20,7 @@ package org.apache.carbondata.spark.load
 import java.util.Comparator
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Accumulator, DataSkewRangePartitioner, RangePartitioner, TaskContext}
+import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -29,18 +29,19 @@ import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
 import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, DataField, DataLoadProcessBuilder, FailureCauses}
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Use sortBy operator in spark to load the data
@@ -127,9 +128,11 @@ object DataLoadProcessBuilderOnSpark {
       }
 
     // 4. Write
-    sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+    sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
+      setTaskListener()
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter, conf.value.value))
+        writeStepRowCounter, conf.value.value)
+    })
 
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
     // not have any functional impact as spark automatically monitors the cache usage on each node
@@ -221,9 +224,11 @@ object DataLoadProcessBuilderOnSpark {
       .map(_._2)
 
     // 4. Sort and Write data
-    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
+      setTaskListener()
       DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter, conf.value.value))
+        writeStepRowCounter, conf.value.value)
+    })
 
     // Log the number of rows in each step
     LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value)
@@ -340,6 +345,16 @@ object DataLoadProcessBuilderOnSpark {
       new PrimtiveOrdering(column.getDataType)
     }
   }
+
+  def setTaskListener(): Unit = {
+    TaskContext.get.addTaskCompletionListener { _ =>
+      CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+    }
+    TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+    val carbonTaskInfo = new CarbonTaskInfo
+    carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
+    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
+  }
 }
 
 class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {