[SPARK-26539][CORE] Remove spark.memory.useLegacyMode and StaticMemoryManager
authorSean Owen <sean.owen@databricks.com>
Thu, 10 Jan 2019 14:57:44 +0000 (08:57 -0600)
committerSean Owen <sean.owen@databricks.com>
Thu, 10 Jan 2019 14:57:44 +0000 (08:57 -0600)
## What changes were proposed in this pull request?

Remove spark.memory.useLegacyMode and StaticMemoryManager. Update tests that used the StaticMemoryManager to equivalent use of UnifiedMemoryManager.

## How was this patch tested?

Existing tests, with modifications to make them work with a different mem manager.

Closes #23457 from srowen/SPARK-26539.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
19 files changed:
core/src/main/scala/org/apache/spark/SparkConf.scala
core/src/main/scala/org/apache/spark/SparkEnv.scala
core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala [deleted file]
core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
core/src/main/scala/org/apache/spark/memory/package.scala
core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala [deleted file]
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
docs/configuration.md
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala

index 681e437..f27df50 100644 (file)
@@ -532,38 +532,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
     }
 
     // Validate memory fractions
-    val deprecatedMemoryKeys = Seq(
-      "spark.storage.memoryFraction",
-      "spark.shuffle.memoryFraction",
-      "spark.shuffle.safetyFraction",
-      "spark.storage.unrollFraction",
-      "spark.storage.safetyFraction")
-    val memoryKeys = Seq(
-      "spark.memory.fraction",
-      "spark.memory.storageFraction") ++
-      deprecatedMemoryKeys
-    for (key <- memoryKeys) {
+    for (key <- Seq("spark.memory.fraction", "spark.memory.storageFraction")) {
       val value = getDouble(key, 0.5)
       if (value > 1 || value < 0) {
         throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').")
       }
     }
 
-    // Warn against deprecated memory fractions (unless legacy memory management mode is enabled)
-    val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
-    val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
-    if (!legacyMemoryManagement) {
-      val keyset = deprecatedMemoryKeys.toSet
-      val detected = settings.keys().asScala.filter(keyset.contains)
-      if (detected.nonEmpty) {
-        logWarning("Detected deprecated memory fraction settings: " +
-          detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " +
-          "memory management are unified. All memory fractions used in the old model are " +
-          "now deprecated and no longer read. If you wish to use the old memory management, " +
-          s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).")
-      }
-    }
-
     if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
       val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
         "instead use \"yarn\" with specified deploy mode."
index 9222781..ba5ed8a 100644 (file)
@@ -31,7 +31,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
+import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
@@ -322,13 +322,7 @@ object SparkEnv extends Logging {
       shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
 
-    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
-    val memoryManager: MemoryManager =
-      if (useLegacyMemoryManager) {
-        new StaticMemoryManager(conf, numUsableCores)
-      } else {
-        UnifiedMemoryManager(conf, numUsableCores)
-      }
+    val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores)
 
     val blockManagerPort = if (isDriver) {
       conf.get(DRIVER_BLOCK_MANAGER_PORT)
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
deleted file mode 100644 (file)
index 7e052c0..0000000
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.memory
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config
-import org.apache.spark.internal.config.Tests.TEST_MEMORY
-import org.apache.spark.storage.BlockId
-
-/**
- * A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
- *
- * The sizes of the execution and storage regions are determined through
- * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two
- * regions are cleanly separated such that neither usage can borrow memory from the other.
- */
-private[spark] class StaticMemoryManager(
-    conf: SparkConf,
-    maxOnHeapExecutionMemory: Long,
-    override val maxOnHeapStorageMemory: Long,
-    numCores: Int)
-  extends MemoryManager(
-    conf,
-    numCores,
-    maxOnHeapStorageMemory,
-    maxOnHeapExecutionMemory) {
-
-  def this(conf: SparkConf, numCores: Int) {
-    this(
-      conf,
-      StaticMemoryManager.getMaxExecutionMemory(conf),
-      StaticMemoryManager.getMaxStorageMemory(conf),
-      numCores)
-  }
-
-  // The StaticMemoryManager does not support off-heap storage memory:
-  offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
-  offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
-
-  // Max number of bytes worth of blocks to evict when unrolling
-  private val maxUnrollMemory: Long = {
-    (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
-  }
-
-  override def maxOffHeapStorageMemory: Long = 0L
-
-  override def acquireStorageMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      memoryMode: MemoryMode): Boolean = synchronized {
-    require(memoryMode != MemoryMode.OFF_HEAP,
-      "StaticMemoryManager does not support off-heap storage memory")
-    if (numBytes > maxOnHeapStorageMemory) {
-      // Fail fast if the block simply won't fit
-      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
-        s"memory limit ($maxOnHeapStorageMemory bytes)")
-      false
-    } else {
-      onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
-    }
-  }
-
-  override def acquireUnrollMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      memoryMode: MemoryMode): Boolean = synchronized {
-    require(memoryMode != MemoryMode.OFF_HEAP,
-      "StaticMemoryManager does not support off-heap unroll memory")
-    if (numBytes > maxOnHeapStorageMemory) {
-      // Fail fast if the block simply won't fit
-      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
-        s"memory limit ($maxOnHeapStorageMemory bytes)")
-      false
-    } else {
-      val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
-      val freeMemory = onHeapStorageMemoryPool.memoryFree
-      // When unrolling, we will use all of the existing free memory, and, if necessary,
-      // some extra space freed from evicting cached blocks. We must place a cap on the
-      // amount of memory to be evicted by unrolling, however, otherwise unrolling one
-      // big block can blow away the entire cache.
-      val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
-      // Keep it within the range 0 <= X <= maxNumBytesToFree
-      val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
-      onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
-    }
-  }
-
-  private[memory]
-  override def acquireExecutionMemory(
-      numBytes: Long,
-      taskAttemptId: Long,
-      memoryMode: MemoryMode): Long = synchronized {
-    memoryMode match {
-      case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
-      case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
-    }
-  }
-}
-
-
-private[spark] object StaticMemoryManager {
-
-  private val MIN_MEMORY_BYTES = 32 * 1024 * 1024
-
-  /**
-   * Return the total amount of memory available for the storage region, in bytes.
-   */
-  private def getMaxStorageMemory(conf: SparkConf): Long = {
-    val systemMaxMemory = conf.get(TEST_MEMORY)
-    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
-    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
-    (systemMaxMemory * memoryFraction * safetyFraction).toLong
-  }
-
-  /**
-   * Return the total amount of memory available for the execution region, in bytes.
-   */
-  private def getMaxExecutionMemory(conf: SparkConf): Long = {
-    val systemMaxMemory = conf.get(TEST_MEMORY)
-
-    if (systemMaxMemory < MIN_MEMORY_BYTES) {
-      throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
-        s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
-        s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
-    }
-    if (conf.contains(config.EXECUTOR_MEMORY)) {
-      val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
-      if (executorMemory < MIN_MEMORY_BYTES) {
-        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
-          s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
-          s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.")
-      }
-    }
-    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
-    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
-    (systemMaxMemory * memoryFraction * safetyFraction).toLong
-  }
-
-}
index 7801bb8..a0fbbbd 100644 (file)
@@ -46,7 +46,7 @@ import org.apache.spark.storage.BlockId
  *                          it if necessary. Cached blocks can be evicted only if actual
  *                          storage memory usage exceeds this region.
  */
-private[spark] class UnifiedMemoryManager private[memory] (
+private[spark] class UnifiedMemoryManager(
     conf: SparkConf,
     val maxHeapMemory: Long,
     onHeapStorageRegionSize: Long,
index 3d00cd9..7f78219 100644 (file)
@@ -61,15 +61,10 @@ package org.apache.spark
  * }}}
  *
  *
- * There are two implementations of [[org.apache.spark.memory.MemoryManager]] which vary in how
- * they handle the sizing of their memory pools:
+ * There is one implementation of [[org.apache.spark.memory.MemoryManager]]:
  *
- *  - [[org.apache.spark.memory.UnifiedMemoryManager]], the default in Spark 1.6+, enforces soft
+ *  - [[org.apache.spark.memory.UnifiedMemoryManager]] enforces soft
  *    boundaries between storage and execution memory, allowing requests for memory in one region
  *    to be fulfilled by borrowing memory from the other.
- *  - [[org.apache.spark.memory.StaticMemoryManager]] enforces hard boundaries between storage
- *    and execution memory by statically partitioning Spark's memory and preventing storage and
- *    execution from borrowing memory from each other. This mode is retained only for legacy
- *    compatibility purposes.
  */
 package object memory
index a0664b3..dc1fe77 100644 (file)
@@ -29,7 +29,7 @@ public class TaskMemoryManagerSuite {
   @Test
   public void leakedPageMemoryIsDetected() {
     final TaskMemoryManager manager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set("spark.memory.offHeap.enabled", "false"),
         Long.MAX_VALUE,
         Long.MAX_VALUE,
index 9ad2e9a..6976464 100644 (file)
@@ -145,8 +145,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
   encryptionTest("Cache broadcast to disk") { conf =>
     conf.setMaster("local")
       .setAppName("test")
-      .set("spark.memory.useLegacyMode", "true")
-      .set("spark.storage.memoryFraction", "0.0")
+      .set("spark.memory.storageFraction", "0.0")
     sc = new SparkContext(conf)
     val list = List[Int](1, 2, 3, 4)
     val broadcast = sc.broadcast(list)
@@ -173,8 +172,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
     val conf = new SparkConf()
       .setMaster("local[4]")
       .setAppName("test")
-      .set("spark.memory.useLegacyMode", "true")
-      .set("spark.storage.memoryFraction", "0.0")
+      .set("spark.memory.storageFraction", "0.0")
 
     sc = new SparkContext(conf)
     val list = List[Int](1, 2, 3, 4)
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
deleted file mode 100644 (file)
index c3275ad..0000000
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.memory
-
-import org.mockito.Mockito.when
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
-import org.apache.spark.internal.config.Tests.TEST_MEMORY
-import org.apache.spark.storage.TestBlockId
-import org.apache.spark.storage.memory.MemoryStore
-
-class StaticMemoryManagerSuite extends MemoryManagerSuite {
-  private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
-
-  /**
-   * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
-   */
-  private def makeThings(
-      maxExecutionMem: Long,
-      maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
-    val mm = new StaticMemoryManager(
-      conf,
-      maxOnHeapExecutionMemory = maxExecutionMem,
-      maxOnHeapStorageMemory = maxStorageMem,
-      numCores = 1)
-    val ms = makeMemoryStore(mm)
-    (mm, ms)
-  }
-
-  override protected def createMemoryManager(
-      maxOnHeapExecutionMemory: Long,
-      maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
-    new StaticMemoryManager(
-      conf.clone
-        .set("spark.memory.fraction", "1")
-        .set(TEST_MEMORY, maxOnHeapExecutionMemory)
-        .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory),
-      maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
-      maxOnHeapStorageMemory = 0,
-      numCores = 1)
-  }
-
-  test("basic execution memory") {
-    val maxExecutionMem = 1000L
-    val taskAttemptId = 0L
-    val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
-    val memoryMode = MemoryMode.ON_HEAP
-    assert(mm.executionMemoryUsed === 0L)
-    assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L)
-    assert(mm.executionMemoryUsed === 10L)
-    assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
-    // Acquire up to the max
-    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 890L)
-    assert(mm.executionMemoryUsed === maxExecutionMem)
-    assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L)
-    assert(mm.executionMemoryUsed === maxExecutionMem)
-    mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode)
-    assert(mm.executionMemoryUsed === 200L)
-    // Acquire after release
-    assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L)
-    assert(mm.executionMemoryUsed === 201L)
-    // Release beyond what was acquired
-    mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, memoryMode)
-    assert(mm.executionMemoryUsed === 0L)
-  }
-
-  test("basic storage memory") {
-    val maxStorageMem = 1000L
-    val dummyBlock = TestBlockId("you can see the world you brought to live")
-    val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
-    val memoryMode = MemoryMode.ON_HEAP
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 10L)
-
-    assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 110L)
-    // Acquire more than the max, not granted
-    assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 110L)
-    // Acquire up to the max, requests after this are still granted due to LRU eviction
-    assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, memoryMode))
-    assertEvictBlocksToFreeSpaceCalled(ms, 110L)
-    assert(mm.storageMemoryUsed === 1000L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
-    assertEvictBlocksToFreeSpaceCalled(ms, 1L)
-    assert(evictedBlocks.nonEmpty)
-    evictedBlocks.clear()
-    // Note: We evicted 1 byte to put another 1-byte block in, so the storage memory used remains at
-    // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted
-    // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests.
-    assert(mm.storageMemoryUsed === 1000L)
-    mm.releaseStorageMemory(800L, memoryMode)
-    assert(mm.storageMemoryUsed === 200L)
-    // Acquire after release
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 201L)
-    mm.releaseAllStorageMemory()
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 1L)
-    // Release beyond what was acquired
-    mm.releaseStorageMemory(100L, memoryMode)
-    assert(mm.storageMemoryUsed === 0L)
-  }
-
-  test("execution and storage isolation") {
-    val maxExecutionMem = 200L
-    val maxStorageMem = 1000L
-    val taskAttemptId = 0L
-    val dummyBlock = TestBlockId("ain't nobody love like you do")
-    val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
-    val memoryMode = MemoryMode.ON_HEAP
-    // Only execution memory should increase
-    assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.executionMemoryUsed === 100L)
-    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 100L)
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.executionMemoryUsed === 200L)
-    // Only storage memory should increase
-    assert(mm.acquireStorageMemory(dummyBlock, 50L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 50L)
-    assert(mm.executionMemoryUsed === 200L)
-    // Only execution memory should be released
-    mm.releaseExecutionMemory(133L, taskAttemptId, memoryMode)
-    assert(mm.storageMemoryUsed === 50L)
-    assert(mm.executionMemoryUsed === 67L)
-    // Only storage memory should be released
-    mm.releaseAllStorageMemory()
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.executionMemoryUsed === 67L)
-  }
-
-  test("unroll memory") {
-    val maxStorageMem = 1000L
-    val dummyBlock = TestBlockId("lonely water")
-    val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
-    val memoryMode = MemoryMode.ON_HEAP
-    assert(mm.acquireUnrollMemory(dummyBlock, 100L, memoryMode))
-    when(ms.currentUnrollMemory).thenReturn(100L)
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 100L)
-    mm.releaseUnrollMemory(40L, memoryMode)
-    assert(mm.storageMemoryUsed === 60L)
-    when(ms.currentUnrollMemory).thenReturn(60L)
-    assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 860L)
-    // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
-    // As of this point, cache memory is 800 bytes and current unroll memory is 60 bytes.
-    // Requesting 240 more bytes of unroll memory will leave our total unroll memory at
-    // 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are granted.
-    assert(mm.acquireUnrollMemory(dummyBlock, 240L, memoryMode))
-    assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000
-    when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
-    assert(mm.storageMemoryUsed === 1000L)
-    evictedBlocks.clear()
-    // We already have 300 bytes of unroll memory, so requesting 150 more will leave us
-    // above the 400-byte limit. Since there is not enough free memory, this request will
-    // fail even after evicting as much as we can (400 - 300 = 100 bytes).
-    assert(!mm.acquireUnrollMemory(dummyBlock, 150L, memoryMode))
-    assertEvictBlocksToFreeSpaceCalled(ms, 100L)
-    assert(mm.storageMemoryUsed === 900L)
-    // Release beyond what was acquired
-    mm.releaseUnrollMemory(maxStorageMem, memoryMode)
-    assert(mm.storageMemoryUsed === 0L)
-  }
-
-}
index 480e07f..2250ae2 100644 (file)
@@ -91,7 +91,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     conf.set(IS_TESTING, true)
     conf.set("spark.memory.fraction", "1")
     conf.set("spark.memory.storageFraction", "1")
-    conf.set("spark.storage.unrollFraction", "0.4")
     conf.set("spark.storage.unrollMemoryThreshold", "512")
 
     // to make a replication attempt to inactive store fail fast
index bda8136..c232641 100644 (file)
@@ -120,7 +120,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       .set("spark.memory.fraction", "1")
       .set("spark.memory.storageFraction", "1")
       .set("spark.kryoserializer.buffer", "1m")
-      .set("spark.storage.unrollFraction", "0.4")
       .set("spark.storage.unrollMemoryThreshold", "512")
 
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
index b02af2b..7cdcd0f 100644 (file)
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 import org.scalatest._
 
 import org.apache.spark._
-import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
+import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager}
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator}
 import org.apache.spark.util._
@@ -39,7 +39,6 @@ class MemoryStoreSuite
   with ResetSystemProperties {
 
   var conf: SparkConf = new SparkConf(false)
-    .set("spark.storage.unrollFraction", "0.4")
     .set("spark.storage.unrollMemoryThreshold", "512")
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
@@ -60,7 +59,7 @@ class MemoryStoreSuite
   }
 
   def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
-    val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
+    val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem, 1)
     val blockInfoManager = new BlockInfoManager
     val blockEvictionHandler = new BlockEvictionHandler {
       var memoryStore: MemoryStore = _
@@ -239,7 +238,7 @@ class MemoryStoreSuite
   }
 
   test("safely unroll blocks through putIteratorAsBytes") {
-    val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+    val (memoryStore, blockInfoManager) = makeMemoryStore(8400)
     val smallList = List.fill(40)(new Array[Byte](100))
     val bigList = List.fill(40)(new Array[Byte](1000))
     def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
@@ -290,11 +289,11 @@ class MemoryStoreSuite
     blockInfoManager.removeBlock("b3")
     putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
 
-    // Unroll huge block with not enough space. This should fail.
+    // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
     val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
     assert(result4.isLeft) // unroll was unsuccessful
     assert(!memoryStore.contains("b1"))
-    assert(memoryStore.contains("b2"))
+    assert(!memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
     assert(!memoryStore.contains("b4"))
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@@ -417,7 +416,7 @@ class MemoryStoreSuite
     val bytesPerSmallBlock = memStoreSize / numInitialBlocks
     def testFailureOnNthDrop(numValidBlocks: Int, readLockAfterDrop: Boolean): Unit = {
       val tc = TaskContext.empty()
-      val memManager = new StaticMemoryManager(conf, Long.MaxValue, memStoreSize, numCores = 1)
+      val memManager = new UnifiedMemoryManager(conf, memStoreSize, memStoreSize.toInt, 1)
       val blockInfoManager = new BlockInfoManager
       blockInfoManager.registerTask(tc.taskAttemptId)
       var droppedSoFar = 0
index 6211399..1e03998 100644 (file)
@@ -551,12 +551,11 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
 
   test("force to spill for external aggregation") {
     val conf = createSparkConf(loadDefaults = false)
-      .set("spark.shuffle.memoryFraction", "0.01")
-      .set("spark.memory.useLegacyMode", "true")
-      .set(TEST_MEMORY, 100000000L)
+      .set("spark.memory.storageFraction", "0.999")
+      .set(TEST_MEMORY, 471859200L)
       .set("spark.shuffle.sort.bypassMergeThreshold", "0")
     sc = new SparkContext("local", "test", conf)
-    val N = 2e5.toInt
+    val N = 200000
     sc.parallelize(1 to N, 2)
       .map { i => (i, i) }
       .groupByKey()
index aa400dd..14148e0 100644 (file)
@@ -638,12 +638,11 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
 
   test("force to spill for external sorter") {
     val conf = createSparkConf(loadDefaults = false, kryo = false)
-      .set("spark.shuffle.memoryFraction", "0.01")
-      .set("spark.memory.useLegacyMode", "true")
-      .set(TEST_MEMORY, 100000000L)
+      .set("spark.memory.storageFraction", "0.999")
+      .set(TEST_MEMORY, 471859200L)
       .set("spark.shuffle.sort.bypassMergeThreshold", "0")
     sc = new SparkContext("local", "test", conf)
-    val N = 2e5.toInt
+    val N = 200000
     val p = new org.apache.spark.HashPartitioner(2)
     val p2 = new org.apache.spark.HashPartitioner(3)
     sc.parallelize(1 to N, 3)
index 33148ed..3c383ee 100644 (file)
@@ -1199,51 +1199,6 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.memory.useLegacyMode</code></td>
-  <td>false</td>
-  <td>
-    Whether to enable the legacy memory management mode used in Spark 1.5 and before.
-    The legacy mode rigidly partitions the heap space into fixed-size regions,
-    potentially leading to excessive spilling if the application was not tuned.
-    The following deprecated memory fraction configurations are not read unless this is enabled:
-    <code>spark.shuffle.memoryFraction</code><br>
-    <code>spark.storage.memoryFraction</code><br>
-    <code>spark.storage.unrollFraction</code>
-  </td>
-</tr>
-<tr>
-  <td><code>spark.shuffle.memoryFraction</code></td>
-  <td>0.2</td>
-  <td>
-    (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled.
-    Fraction of Java heap to use for aggregation and cogroups during shuffles.
-    At any given time, the collective size of
-    all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
-    begin to spill to disk. If spills are often, consider increasing this value at the expense of
-    <code>spark.storage.memoryFraction</code>.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.storage.memoryFraction</code></td>
-  <td>0.6</td>
-  <td>
-    (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled.
-    Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
-    generation of objects in the JVM, which by default is given 0.6 of the heap, but you can
-    increase it if you configure your own old generation size.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.storage.unrollFraction</code></td>
-  <td>0.2</td>
-  <td>
-    (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled.
-    Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory.
-    This is dynamically allocated by dropping existing blocks when there is not enough free
-    storage space to unroll the new block in its entirety.
-  </td>
-</tr>
-<tr>
   <td><code>spark.storage.replication.proactive</code></td>
   <td>false</td>
   <td>
index 7c21062..a708926 100644 (file)
@@ -94,7 +94,7 @@ private[execution] object HashedRelation {
       taskMemoryManager: TaskMemoryManager = null): HashedRelation = {
     val mm = Option(taskMemoryManager).getOrElse {
       new TaskMemoryManager(
-        new StaticMemoryManager(
+        new UnifiedMemoryManager(
           new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
           Long.MaxValue,
           Long.MaxValue,
@@ -227,7 +227,7 @@ private[joins] class UnsafeHashedRelation(
     // TODO(josh): This needs to be revisited before we merge this patch; making this change now
     // so that tests compile:
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
@@ -392,7 +392,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
   def this() = {
     this(
       new TaskMemoryManager(
-        new StaticMemoryManager(
+        new UnifiedMemoryManager(
           new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
           Long.MaxValue,
           Long.MaxValue,
index b7d2898..334f027 100644 (file)
@@ -22,7 +22,7 @@ import java.util.HashMap
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.internal.config._
-import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
 import org.apache.spark.sql.execution.vectorized.AggregateHashMap
@@ -473,7 +473,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
           value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
           value.setInt(0, 555)
           val taskMemoryManager = new TaskMemoryManager(
-            new StaticMemoryManager(
+            new UnifiedMemoryManager(
               new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
               Long.MaxValue,
               Long.MaxValue,
@@ -504,7 +504,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
       Seq("off", "on").foreach { heap =>
         benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { _ =>
           val taskMemoryManager = new TaskMemoryManager(
-            new StaticMemoryManager(
+            new UnifiedMemoryManager(
               new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}")
                 .set(MEMORY_OFFHEAP_SIZE.key, "102400000"),
               Long.MaxValue,
index bdf753d..0b356a9 100644 (file)
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.benchmark
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
-import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, UnsafeProjection}
 import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
@@ -43,7 +43,7 @@ object HashedRelationMetricsBenchmark extends SqlBasedBenchmark {
       val benchmark = new Benchmark("LongToUnsafeRowMap metrics", numRows, output = output)
       benchmark.addCase("LongToUnsafeRowMap") { iter =>
         val taskMemoryManager = new TaskMemoryManager(
-          new StaticMemoryManager(
+          new UnifiedMemoryManager(
             new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
             Long.MaxValue,
             Long.MaxValue,
index d9b34dc..7b55e83 100644 (file)
@@ -23,7 +23,7 @@ import scala.util.Random
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
-import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -36,7 +36,7 @@ import org.apache.spark.util.collection.CompactBuffer
 class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   val mm = new TaskMemoryManager(
-    new StaticMemoryManager(
+    new UnifiedMemoryManager(
       new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
       Long.MaxValue,
       Long.MaxValue,
@@ -85,7 +85,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   test("test serialization empty hash map") {
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
@@ -157,7 +157,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   test("LongToUnsafeRowMap with very wide range") {
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
@@ -202,7 +202,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   test("LongToUnsafeRowMap with random keys") {
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
@@ -256,7 +256,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   test("SPARK-24257: insert big values into LongToUnsafeRowMap") {
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
index fe65353..d1a6e8a 100644 (file)
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.memory.StaticMemoryManager
+import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
@@ -215,8 +215,6 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
   test("Test Block - isFullyConsumed") {
     val sparkConf = new SparkConf().set("spark.app.id", "streaming-test")
     sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
-    // spark.storage.unrollFraction set to 0.4 for BlockManager
-    sparkConf.set("spark.storage.unrollFraction", "0.4")
 
     sparkConf.set(IO_ENCRYPTION_ENABLED, enableEncryption)
     // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
@@ -282,7 +280,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
       maxMem: Long,
       conf: SparkConf,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
-    val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
+    val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem, 1)
     val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
     val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)