Flaky test_rolling_upgrade
[cassandra-dtest.git] / transient_replication_ring_test.py
index a519c62a46b158723c31931edf50375c4a1ed6ed..5a20952bd935349e099bfea068b4c09606f86675 100644 (file)
@@ -3,7 +3,9 @@ import types
 
 from cassandra import ConsistencyLevel
 from cassandra.query import SimpleStatement
+from ccmlib.cluster import DEFAULT_CLUSTER_WAIT_TIMEOUT_IN_SECS
 from ccmlib.node import Node
+from ccmlib.node import NODE_WAIT_TIMEOUT_IN_SECS
 
 from dtest import Tester
 from tools.assertions import (assert_all)
@@ -21,6 +23,7 @@ logging.getLogger('cassandra').setLevel(logging.CRITICAL)
 
 NODELOCAL = 11
 
+
 def jmx_start(to_start, **kwargs):
     kwargs['jvm_args'] = kwargs.get('jvm_args', []) + ['-XX:-PerfDisableSharedMem']
     to_start.start(**kwargs)
@@ -34,10 +37,12 @@ def repair_nodes(nodes):
     for node in nodes:
         node.nodetool('repair -pr')
 
+
 def cleanup_nodes(nodes):
     for node in nodes:
         node.nodetool('cleanup')
 
+
 def patch_start(startable):
     old_start = startable.start
 
@@ -67,8 +72,10 @@ class TestTransientReplicationRing(Tester):
     def point_select_statement(self):
         return SimpleStatement(self.point_select(), consistency_level=NODELOCAL)
 
-    def check_expected(self, sessions, expected, node=[i for i in range(0,1000)], cleanup=False):
+    def check_expected(self, sessions, expected, node=None, cleanup=False):
         """Check that each node has the expected values present"""
+        if node is None:
+            node = list(range(1000))
         for idx, session, expect, node in zip(range(0, 1000), sessions, expected, node):
             print("Checking idx " + str(idx))
             print(str([row for row in session.execute(self.select_statement())]))
@@ -84,10 +91,10 @@ class TestTransientReplicationRing(Tester):
         for i in range(0, 40):
             count = 0
             for session in sessions:
-                for row in session.execute(self.point_select_statement(), ["%05d" % i]):
+                for _ in session.execute(self.point_select_statement(), ["%05d" % i]):
                     count += 1
             if exactly:
-                assert count == exactly, "Wrong replication for %05d should be exactly" % (i, exactly)
+                assert count == exactly, "Wrong replication for %05d should be exactly %d" % (i, exactly)
             if gte:
                 assert count >= gte, "Count for %05d should be >= %d" % (i, gte)
             if lte:
@@ -106,12 +113,13 @@ class TestTransientReplicationRing(Tester):
                                                        'num_tokens': 1,
                                                        'commitlog_sync_period_in_ms': 500,
                                                        'enable_transient_replication': True,
-                                                       'partitioner' : 'org.apache.cassandra.dht.OrderPreservingPartitioner'})
+                                                       'partitioner': 'org.apache.cassandra.dht.OrderPreservingPartitioner'})
         print("CLUSTER INSTALL DIR: ")
         print(self.cluster.get_install_dir())
         self.cluster.populate(3, tokens=self.tokens, debug=True, install_byteman=True)
         # self.cluster.populate(3, debug=True, install_byteman=True)
-        self.cluster.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=['-Dcassandra.enable_nodelocal_queries=true'])
+        self.cluster.start(jvm_args=['-Dcassandra.enable_nodelocal_queries=true'],
+                           timeout=DEFAULT_CLUSTER_WAIT_TIMEOUT_IN_SECS * 2)
 
         # enable shared memory
         for node in self.cluster.nodelist():
@@ -131,17 +139,17 @@ class TestTransientReplicationRing(Tester):
         print("CREATE KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params))
         session.execute(
             "CREATE TABLE %s.%s (pk varchar, ck int, value int, PRIMARY KEY (pk, ck)) WITH speculative_retry = 'NEVER' AND additional_write_policy = 'NEVER' AND read_repair = 'NONE'" % (
-            self.keyspace, self.table))
+                self.keyspace, self.table))
 
     def quorum(self, session, stmt_str):
         return session.execute(SimpleStatement(stmt_str, consistency_level=ConsistencyLevel.QUORUM))
 
     def insert_row(self, pk, ck, value, session=None, node=None):
         session = session or self.exclusive_cql_connection(node or self.node1)
-        #token = BytesToken.from_key(pack('>i', pk)).value
-        #assert token < BytesToken.from_string(self.tokens[0]).value or BytesToken.from_string(self.tokens[-1]).value < token   # primary replica should be node1
-        #TODO Is quorum really right? I mean maybe we want ALL with retries since we really don't want to the data
-        #not at a replica unless it is intentional
+        # token = BytesToken.from_key(pack('>i', pk)).value
+        # assert token < BytesToken.from_string(self.tokens[0]).value or BytesToken.from_string(self.tokens[-1]).value < token   # primary replica should be node1
+        # TODO Is quorum really right? I mean maybe we want ALL with retries since we really don't want to the data
+        # not at a replica unless it is intentional
         self.quorum(session, "INSERT INTO %s.%s (pk, ck, value) VALUES ('%05d', %s, %s)" % (self.keyspace, self.table, pk, ck, value))
 
     @flaky(max_runs=1)
@@ -161,19 +169,19 @@ class TestTransientReplicationRing(Tester):
                     gen_expected(range(12, 31, 2))]
         self.check_expected(sessions, expected)
 
-        #Make sure at least a little data is repaired, this shouldn't move data anywhere
+        # Make sure at least a little data is repaired, this shouldn't move data anywhere
         repair_nodes(nodes)
 
         self.check_expected(sessions, expected)
 
-        #Ensure that there is at least some transient data around, because of this if it's missing after bootstrap
-        #We know we failed to get it from the transient replica losing the range entirely
+        # Ensure that there is at least some transient data around, because of this if it's missing after bootstrap
+        # We know we failed to get it from the transient replica losing the range entirely
         nodes[1].stop(wait_other_notice=True)
 
         for i in range(1, 40, 2):
             self.insert_row(i, i, i, main_session)
 
-        nodes[1].start(wait_for_binary_proto=True, wait_other_notice=True)
+        nodes[1].start(wait_for_binary_proto=True)
 
         sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3]]
 
@@ -181,32 +189,32 @@ class TestTransientReplicationRing(Tester):
                     gen_expected(range(0, 21, 2), range(32, 40, 2)),
                     gen_expected(range(1, 11, 2), range(11, 31), range(31, 40, 2))]
 
-        #Every node should have some of its fully replicated data and one and two should have some transient data
+        # Every node should have some of its fully replicated data and one and two should have some transient data
         self.check_expected(sessions, expected)
 
         node4 = new_node(self.cluster, bootstrap=True, token='00040')
         patch_start(node4)
         nodes.append(node4)
-        node4.start(wait_for_binary_proto=True, wait_other_notice=True)
+        node4.start(wait_for_binary_proto=True)
 
         expected.append(gen_expected(range(11, 20, 2), range(21, 40)))
         sessions.append(self.exclusive_cql_connection(node4))
 
-        #Because repair was never run and nodes had transient data it will have data for transient ranges (node1, 11-20)
+        # Because repair was never run and nodes had transient data it will have data for transient ranges (node1, 11-20)
         assert_all(sessions[3],
                    self.select(),
                    expected[3],
                    cl=NODELOCAL)
 
-        #Node1 no longer transiently replicates 11-20, so cleanup will clean it up
-        #Node1 also now transiently replicates 21-30 and half the values in that range were repaired
+        # Node1 no longer transiently replicates 11-20, so cleanup will clean it up
+        # Node1 also now transiently replicates 21-30 and half the values in that range were repaired
         expected[0] = gen_expected(range(0, 11), range(21, 30, 2), range(31, 40))
-        #Node2 still missing data since it was down during some insertions, it also lost some range (31-40)
+        # Node2 still missing data since it was down during some insertions, it also lost some range (31-40)
         expected[1] = gen_expected(range(0, 21, 2))
         expected[2] = gen_expected(range(1, 11, 2), range(11, 31))
 
-        #Cleanup should only impact if a node lost a range entirely or started to transiently replicate it and the data
-        #was repaired
+        # Cleanup should only impact if a node lost a range entirely or started to transiently replicate it and the data
+        # was repaired
         self.check_expected(sessions, expected, nodes, cleanup=True)
 
         repair_nodes(nodes)
@@ -218,16 +226,15 @@ class TestTransientReplicationRing(Tester):
 
         self.check_expected(sessions, expected, nodes, cleanup=True)
 
-        #Every value should be replicated exactly 2 times
+        # Every value should be replicated exactly 2 times
         self.check_replication(sessions, exactly=2)
 
-    @flaky(max_runs=1)
     @pytest.mark.no_vnodes
     def move_test(self, move_token, expected_after_move, expected_after_repair):
         """Helper method to run a move test cycle"""
         node4 = new_node(self.cluster, bootstrap=True, token='00040')
         patch_start(node4)
-        node4.start(wait_for_binary_proto=True, wait_other_notice=True)
+        node4.start(wait_for_binary_proto=NODE_WAIT_TIMEOUT_IN_SECS * 2)
         main_session = self.patient_cql_connection(self.node1)
         nodes = [self.node1, self.node2, self.node3, node4]
 
@@ -246,7 +253,7 @@ class TestTransientReplicationRing(Tester):
             print("Inserting " + str(i))
             self.insert_row(i, i, i, main_session)
 
-        nodes[1].start(wait_for_binary_proto=True, wait_other_notice=True)
+        nodes[1].start(wait_for_binary_proto=True)
         sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3, node4]]
 
         expected = [gen_expected(range(0, 11), range(31, 40)),
@@ -267,7 +274,6 @@ class TestTransientReplicationRing(Tester):
         self.check_expected(sessions, expected_after_repair, nodes, cleanup=True)
         self.check_replication(sessions, exactly=2)
 
-
     @flaky(max_runs=1)
     @pytest.mark.no_vnodes
     def test_move_forwards_between_and_cleanup(self):
@@ -275,7 +281,7 @@ class TestTransientReplicationRing(Tester):
         move_token = '00025'
         expected_after_move = [gen_expected(range(0, 26), range(31, 40, 2)),
                                gen_expected(range(0, 21, 2), range(31, 40)),
-                               gen_expected(range(1, 11, 2), range(11, 21, 2), range(21,31)),
+                               gen_expected(range(1, 11, 2), range(11, 21, 2), range(21, 31)),
                                gen_expected(range(21, 26, 2), range(26, 40))]
         expected_after_repair = [gen_expected(range(0, 26)),
                                  gen_expected(range(0, 21), range(31, 40)),
@@ -283,7 +289,6 @@ class TestTransientReplicationRing(Tester):
                                  gen_expected(range(26, 40))]
         self.move_test(move_token, expected_after_move, expected_after_repair)
 
-
     @flaky(max_runs=1)
     @pytest.mark.no_vnodes
     def test_move_forwards_and_cleanup(self):
@@ -299,7 +304,6 @@ class TestTransientReplicationRing(Tester):
                                  gen_expected(range(21, 40))]
         self.move_test(move_token, expected_after_move, expected_after_repair)
 
-
     @flaky(max_runs=1)
     @pytest.mark.no_vnodes
     def test_move_backwards_between_and_cleanup(self):
@@ -315,7 +319,6 @@ class TestTransientReplicationRing(Tester):
                                  gen_expected(range(31, 40))]
         self.move_test(move_token, expected_after_move, expected_after_repair)
 
-
     @flaky(max_runs=1)
     @pytest.mark.no_vnodes
     def test_move_backwards_and_cleanup(self):
@@ -331,14 +334,13 @@ class TestTransientReplicationRing(Tester):
                                  gen_expected(range(21, 40))]
         self.move_test(move_token, expected_after_move, expected_after_repair)
 
-
     @flaky(max_runs=1)
     @pytest.mark.no_vnodes
     def test_decommission(self):
         """Test decommissioning a node correctly streams out all the data"""
         node4 = new_node(self.cluster, bootstrap=True, token='00040')
         patch_start(node4)
-        node4.start(wait_for_binary_proto=True, wait_other_notice=True)
+        node4.start(wait_for_binary_proto=True)
         main_session = self.patient_cql_connection(self.node1)
         nodes = [self.node1, self.node2, self.node3, node4]
 
@@ -357,7 +359,7 @@ class TestTransientReplicationRing(Tester):
             print("Inserting " + str(i))
             self.insert_row(i, i, i, main_session)
 
-        nodes[1].start(wait_for_binary_proto=True, wait_other_notice=True)
+        nodes[1].start(wait_for_binary_proto=True)
         sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3, node4]]
 
         expected = [gen_expected(range(0, 11), range(31, 40)),
@@ -367,7 +369,7 @@ class TestTransientReplicationRing(Tester):
 
         self.check_expected(sessions, expected)
 
-        #node1 has transient data we want to see streamed out on move
+        # node1 has transient data we want to see streamed out on move
         nodes[3].nodetool('decommission')
 
         nodes = nodes[:-1]
@@ -384,7 +386,7 @@ class TestTransientReplicationRing(Tester):
 
         repair_nodes(nodes)
 
-        #There should be no transient data anywhere
+        # There should be no transient data anywhere
         expected = [gen_expected(range(0, 11), range(21, 40)),
                     gen_expected(range(0, 21), range(31, 40)),
                     gen_expected(range(11, 31))]
@@ -392,24 +394,21 @@ class TestTransientReplicationRing(Tester):
         self.check_expected(sessions, expected, nodes, cleanup=True)
         self.check_replication(sessions, exactly=2)
 
-
-    @flaky(max_runs=1)
     @pytest.mark.no_vnodes
     def test_remove(self):
         """Test  a mix of ring change operations across a mix of transient and repaired/unrepaired data"""
         node4 = new_node(self.cluster, bootstrap=True, token='00040')
         patch_start(node4)
-        node4.start(wait_for_binary_proto=True, wait_other_notice=True)
+        node4.start(wait_for_binary_proto=True)
         main_session = self.patient_cql_connection(self.node1)
         nodes = [self.node1, self.node2, self.node3]
 
-        #We want the node being removed to have no data on it so nodetool remove always gets all the necessary data
-        #from survivors
+        # We want the node being removed to have no data on it
+        # so nodetool remove always gets all the necessary data from survivors
         node4_id = node4.nodetool('info').stdout[25:61]
         node4.stop(wait_other_notice=True)
 
         for i in range(0, 40):
-            print("Inserting " + str(i))
             self.insert_row(i, i, i, main_session)
 
         sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3]]
@@ -423,69 +422,58 @@ class TestTransientReplicationRing(Tester):
 
         nodes[0].nodetool('removenode ' + node4_id)
 
-        #Give streaming time to occur, it's asynchronous from removenode completing at other ndoes
+        # Give streaming time to occur, it's asynchronous from removenode completing at other nodes
         import time
         time.sleep(15)
 
-        # Everyone should have everything except
-        expected = [gen_expected(range(0, 40)),
-                    gen_expected(range(0, 40)),
-                    gen_expected(range(0,40))]
+        self._everyone_should_have_everything(sessions)
 
-        self.check_replication(sessions, exactly=3)
-        self.check_expected(sessions, expected)
         repair_nodes(nodes)
         cleanup_nodes(nodes)
 
-        self.check_replication(sessions, exactly=2)
+        self._nodes_have_proper_ranges_after_repair_and_cleanup(sessions)
 
-        expected = [gen_expected(range(0,11), range(21,40)),
-                    gen_expected(range(0,21), range(31, 40)),
-                    gen_expected(range(11,31))]
-        self.check_expected(sessions, expected)
-
-    @flaky(max_runs=1)
     @pytest.mark.no_vnodes
     def test_replace(self):
         main_session = self.patient_cql_connection(self.node1)
 
-        #We want the node being replaced to have no data on it so the replacement definitely fetches all the data
+        # We want the node being replaced to have no data on it so the replacement definitely fetches all the data
         self.node2.stop(wait_other_notice=True)
 
         for i in range(0, 40):
-            print("Inserting " + str(i))
             self.insert_row(i, i, i, main_session)
 
         replacement_address = self.node2.address()
         self.node2.stop(wait_other_notice=True)
         self.cluster.remove(self.node2)
         self.node2 = Node('replacement', cluster=self.cluster, auto_bootstrap=True,
-                                         thrift_interface=None, storage_interface=(replacement_address, 7000),
-                                         jmx_port='7400', remote_debug_port='0', initial_token=None, binary_interface=(replacement_address, 9042))
+                          thrift_interface=None, storage_interface=(replacement_address, 7000),
+                          jmx_port='7400', remote_debug_port='0', initial_token=None, binary_interface=(replacement_address, 9042))
         patch_start(self.node2)
         nodes = [self.node1, self.node2, self.node3]
         self.cluster.add(self.node2, False, data_center='datacenter1')
         jvm_args = ["-Dcassandra.replace_address=%s" % replacement_address,
                     "-Dcassandra.ring_delay_ms=10000",
                     "-Dcassandra.broadcast_interval_ms=10000"]
-        self.node2.start(jvm_args=jvm_args, wait_for_binary_proto=True, wait_other_notice=True)
+        self.node2.start(jvm_args=jvm_args, wait_for_binary_proto=True)
 
         sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3]]
 
-        # Everyone should have everything
-        expected = [gen_expected(range(0, 40)),
-                    gen_expected(range(0, 40)),
-                    gen_expected(range(0,40))]
-
-        self.check_replication(sessions, exactly=3)
-        self.check_expected(sessions, expected)
+        self._everyone_should_have_everything(sessions)
 
         repair_nodes(nodes)
         cleanup_nodes(nodes)
 
-        self.check_replication(sessions, exactly=2)
+        self._nodes_have_proper_ranges_after_repair_and_cleanup(sessions)
 
-        expected = [gen_expected(range(0,11), range(21,40)),
-                    gen_expected(range(0,21), range(31, 40)),
-                    gen_expected(range(11,31))]
-        self.check_expected(sessions, expected)
\ No newline at end of file
+    def _everyone_should_have_everything(self, sessions):
+        expected = [gen_expected(range(0, 40))] * 3
+        self.check_replication(sessions, exactly=3)
+        self.check_expected(sessions, expected)
+
+    def _nodes_have_proper_ranges_after_repair_and_cleanup(self, sessions):
+        expected = [gen_expected(range(0, 11), range(21, 40)),
+                    gen_expected(range(0, 21), range(31, 40)),
+                    gen_expected(range(11, 31))]
+        self.check_replication(sessions, exactly=2)
+        self.check_expected(sessions, expected)