o Added a dedicated logger for the transaction : TXN_LOG
authorEmmanuel Lécharny <elecharny@apache.org>
Sun, 8 Mar 2015 17:24:55 +0000 (17:24 +0000)
committerEmmanuel Lécharny <elecharny@apache.org>
Sun, 8 Mar 2015 17:24:55 +0000 (17:24 +0000)
o Removed the lock from the AbstractTransactionManager
o Added a map to keep a track of the number of times a page is written
o Fixes in the way we handle transactions in the RM
o Added some explicit generics

mavibot/src/main/java/org/apache/directory/mavibot/btree/AbstractTransactionManager.java
mavibot/src/main/java/org/apache/directory/mavibot/btree/RecordManager.java
mavibot/src/main/java/org/apache/directory/mavibot/btree/TransactionManager.java

index 4ac5c52..2798a91 100644 (file)
  */
 package org.apache.directory.mavibot.btree;
 
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * An abstract class implementing the TransactionManager interface.
  * 
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
-public abstract class AbstractTransactionManager implements TransactionManager
+public abstract class AbstractTransactionManager<K, V> implements TransactionManager<K, V>
 {
-    /** A lock to protect the transaction handling */
-    private ReadWriteLock transactionLock = new ReentrantReadWriteLock();
 }
index fee9172..462655c 100644 (file)
@@ -35,7 +35,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -69,6 +68,9 @@ public class RecordManager extends AbstractTransactionManager
     /** The LoggerFactory used by this class */
     protected static final Logger LOG = LoggerFactory.getLogger( RecordManager.class );
 
+    /** The LoggerFactory used to trace TXN operations */
+    protected static final Logger TXN_LOG = LoggerFactory.getLogger( "TXN_LOG" );
+
     /** The LoggerFactory used by this class */
     protected static final Logger LOG_PAGES = LoggerFactory.getLogger( "org.apache.directory.mavibot.LOG_PAGES" );
 
@@ -177,7 +179,7 @@ public class RecordManager extends AbstractTransactionManager
     private long previousBtreeOfBtreesOffset = NO_PAGE;
 
     /** A lock to protect the transaction handling */
-    private Lock transactionLock = new ReentrantLock();
+    private ReentrantLock transactionLock = new ReentrantLock();
 
     /** A ThreadLocalStorage used to store the current transaction */
     private static final ThreadLocal<Integer> context = new ThreadLocal<Integer>();
@@ -212,6 +214,8 @@ public class RecordManager extends AbstractTransactionManager
     /** the threshold at which the SpaceReclaimer will be run to free the copied pages */
     private int spaceReclaimerThreshold = 200;
 
+    public Map<Long, Integer> writeCounter = new HashMap<Long, Integer>();
+
 
     /**
      * Create a Record manager which will either create the underlying file
@@ -556,9 +560,22 @@ public class RecordManager extends AbstractTransactionManager
      */
     public void beginTransaction()
     {
+        if ( TXN_LOG.isDebugEnabled() )
+        {
+            TXN_LOG.debug( "Begining a new transaction on thread {}, TxnLevel {}",
+                Thread.currentThread().getName(), getTxnLevel() );
+        }
+
         // First, take the lock if it's not already taken
-        //( ( ReadWriteLock ) transactionLock ).writeLock();
-        transactionLock.lock();
+        if ( !( ( ReentrantLock ) transactionLock ).isHeldByCurrentThread() )
+        {
+            TXN_LOG.debug( "--> Lock taken" );
+            transactionLock.lock();
+        }
+        else
+        {
+            TXN_LOG.debug( "..o The current thread already holds the lock" );
+        }
 
         // Now, check the TLS state
         incrementTxnLevel();
@@ -570,13 +587,30 @@ public class RecordManager extends AbstractTransactionManager
      */
     public void commit()
     {
-        if ( !fileChannel.isOpen() )
+        // We *must* own the transactionLock
+        if ( !transactionLock.isHeldByCurrentThread() )
+        {
+            TXN_LOG.error( "This thread does not hold the transactionLock" );
+            throw new RecordManagerException( "This thread does not hold the transactionLock" );
+        }
+
+        if ( TXN_LOG.isDebugEnabled() )
         {
-            // The file has been closed, nothing remains to commit, let's get out
-            transactionLock.unlock();
+            TXN_LOG.debug( "Committing a transaction on thread {}, TxnLevel {}",
+                Thread.currentThread().getName(), getTxnLevel() );
+        }
 
+        if ( !fileChannel.isOpen() )
+        {
             // Still we have to decrement the TransactionLevel
-            decrementTxnLevel();
+            int txnLevel = decrementTxnLevel();
+
+            if ( txnLevel == 0 )
+            {
+                // We can safely release the lock
+                // The file has been closed, nothing remains to commit, let's get out
+                transactionLock.unlock();
+            }
 
             return;
         }
@@ -620,9 +654,6 @@ public class RecordManager extends AbstractTransactionManager
                 // here, we have to erase the old references to keep only the new ones.
                 updateRecordManagerHeader();
 
-                // And decrement the number of started transactions
-                decrementTxnLevel();
-
                 commitCount++;
 
                 if ( commitCount >= spaceReclaimerThreshold )
@@ -630,8 +661,14 @@ public class RecordManager extends AbstractTransactionManager
                     runReclaimer();
                 }
 
-                // Finally, release the global lock
-                transactionLock.unlock();
+                // Finally, decrement the number of started transactions
+                // and release the global lock if possible
+                int txnLevel = decrementTxnLevel();
+
+                if ( txnLevel == 0 )
+                {
+                    transactionLock.unlock();
+                }
 
                 return;
 
@@ -664,9 +701,6 @@ public class RecordManager extends AbstractTransactionManager
                 // here, we have to erase the old references to keep only the new ones.
                 updateRecordManagerHeader();
 
-                // And decrement the number of started transactions
-                decrementTxnLevel();
-
                 commitCount++;
 
                 if ( commitCount >= spaceReclaimerThreshold )
@@ -674,8 +708,14 @@ public class RecordManager extends AbstractTransactionManager
                     runReclaimer();
                 }
 
-                // Finally, release the global lock
-                transactionLock.unlock();
+                // Finally, decrement the number of started transactions
+                // and release the global lock
+                txnLevel = decrementTxnLevel();
+
+                if ( txnLevel == 0 )
+                {
+                    transactionLock.unlock();
+                }
 
                 return;
         }
@@ -689,6 +729,22 @@ public class RecordManager extends AbstractTransactionManager
 
 
     /**
+     * Get the transactionLevel, ie the number of encapsulated update ops
+     */
+    private int getTxnLevel()
+    {
+        Integer nbTxnLevel = context.get();
+
+        if ( nbTxnLevel == null )
+        {
+            return -1;
+        }
+
+        return nbTxnLevel;
+    }
+
+
+    /**
      * Increment the transactionLevel
      */
     private void incrementTxnLevel()
@@ -705,27 +761,28 @@ public class RecordManager extends AbstractTransactionManager
             context.set( nbTxnLevel + 1 );
         }
 
-        /*
-        System.out.println( "Incrementing : " + context.get() );
-        
-        if ( context.get() == 0 )
+        if ( TXN_LOG.isDebugEnabled() )
         {
-            System.out.println( "-------------" );
+            TXN_LOG.debug( "Incrementing the TxnLevel : {}", context.get() );
         }
-        */
     }
 
 
     /**
      * Decrement the transactionLevel
      */
-    private void decrementTxnLevel()
+    private int decrementTxnLevel()
     {
-        int nbTxnStarted = context.get();
+        int nbTxnStarted = context.get() - 1;
 
-        context.set( nbTxnStarted - 1 );
+        context.set( nbTxnStarted );
 
-        //System.out.println( "Incrementing : " + context.get() );
+        if ( TXN_LOG.isDebugEnabled() )
+        {
+            TXN_LOG.debug( "Decrementing the TxnLevel : {}", context.get() );
+        }
+
+        return nbTxnStarted;
     }
 
 
@@ -734,6 +791,19 @@ public class RecordManager extends AbstractTransactionManager
      */
     public void rollback()
     {
+        // We *must* own the transactionLock
+        if ( !transactionLock.isHeldByCurrentThread() )
+        {
+            TXN_LOG.error( "This thread does not hold the transactionLock" );
+            throw new RecordManagerException( "This thread does not hold the transactionLock" );
+        }
+
+        if ( TXN_LOG.isDebugEnabled() )
+        {
+            TXN_LOG.debug( "Rollbacking a new transaction on thread {}, TxnLevel {}",
+                Thread.currentThread().getName(), getTxnLevel() );
+        }
+
         // Reset the counter
         context.set( ROLLBACKED_TXN );
 
@@ -760,6 +830,8 @@ public class RecordManager extends AbstractTransactionManager
         // And restore the BTreeHeaders new Map to the current state
         revertBtreeHeaders();
 
+        // This is an all-of-nothing operation : we can't have a transaction within
+        // a transaction that would survive an inner transaction rollback.
         transactionLock.unlock();
     }
 
@@ -1948,6 +2020,7 @@ public class RecordManager extends AbstractTransactionManager
 
         try
         {
+            writeCounter.put( 0L, writeCounter.containsKey( 0L ) ? writeCounter.get( 0L ) + 1 : 1 );
             fileChannel.write( RECORD_MANAGER_HEADER_BUFFER, 0 );
         }
         catch ( IOException ioe )
@@ -2433,12 +2506,14 @@ public class RecordManager extends AbstractTransactionManager
         for ( PageIO pageIo : pageIos )
         {
             pageIo.getData().rewind();
+            long pos = pageIo.getOffset();
 
             if ( fileChannel.size() < ( pageIo.getOffset() + pageSize ) )
             {
                 LOG.debug( "Adding a page at the end of the file" );
                 // This is a page we have to add to the file
-                fileChannel.write( pageIo.getData(), fileChannel.size() );
+                pos = fileChannel.size();
+                fileChannel.write( pageIo.getData(), pos );
                 //fileChannel.force( false );
             }
             else
@@ -2448,6 +2523,8 @@ public class RecordManager extends AbstractTransactionManager
                 //fileChannel.force( false );
             }
 
+            writeCounter.put( pos, writeCounter.containsKey( pos ) ? writeCounter.get( pos ) + 1 : 1 );
+
             nbUpdatePageIOs.incrementAndGet();
 
             pageIo.getData().rewind();
@@ -3873,8 +3950,6 @@ public class RecordManager extends AbstractTransactionManager
 
     private void checkFreePages() throws EndOfFileExceededException, IOException
     {
-        System.out.println( "Checking the free pages, starting from " + Long.toHexString( firstFreePage ) );
-
         // read all the free pages, add them into a set, to be sure we don't have a cycle
         Set<Long> freePageOffsets = new HashSet<Long>();
 
@@ -3882,8 +3957,6 @@ public class RecordManager extends AbstractTransactionManager
 
         while ( currentFreePageOffset != NO_PAGE )
         {
-            System.out.println( "Next page offset :" + Long.toHexString( currentFreePageOffset ) );
-
             if ( ( currentFreePageOffset % pageSize ) != 0 )
             {
                 throw new InvalidOffsetException( "Wrong offset : " + Long.toHexString( currentFreePageOffset ) );
index aa458e7..bfa9897 100644 (file)
  */
 package org.apache.directory.mavibot.btree;
 
+
 /**
  * An interface used to manage the transactions mechanism in B-trees. Transactions are cross
  * B-trees.
  *
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
-public interface TransactionManager
+public interface TransactionManager<K, V>
 {
     /**
      * Starts a transaction
@@ -43,21 +44,21 @@ public interface TransactionManager
      * Rollback a transaction
      */
     void rollback();
-    
-    
+
+
     /**
      * Gets the current BtreeHeader for a given BTree.
      * 
      * @param btreeName The Btree name we are looking the BtreeHeader for
      * @return the current BTreeHeader
      */
-    BTreeHeader<?, ?> getBTreeHeader( String btreeName );
-    
-    
+    BTreeHeader<K, V> getBTreeHeader( String btreeName );
+
+
     /**
      * Updates the map of new BTreeHeaders
      * 
      * @param btreeHeader The new BtreeHeader
      */
-    void updateNewBTreeHeaders( BTreeHeader<?, ?> btreeHeader );
+    void updateNewBTreeHeaders( BTreeHeader<K, V> btreeHeader );
 }