View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
21  import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_CF_NAME;
22  import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME;
23  import static org.apache.omid.tso.client.OmidClientConfiguration.ConnType.HA;
24  import static org.testng.Assert.assertEquals;
25  import static org.testng.Assert.assertTrue;
26  import static org.testng.Assert.fail;
27  
28  import java.io.IOException;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.curator.RetryPolicy;
33  import org.apache.curator.framework.CuratorFramework;
34  import org.apache.curator.framework.CuratorFrameworkFactory;
35  import org.apache.curator.framework.recipes.cache.NodeCache;
36  import org.apache.curator.framework.recipes.cache.NodeCacheListener;
37  import org.apache.curator.retry.ExponentialBackoffRetry;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Get;
40  import org.apache.hadoop.hbase.client.HBaseAdmin;
41  import org.apache.hadoop.hbase.client.Put;
42  import org.apache.hadoop.hbase.client.Result;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.omid.TestUtils;
45  import org.apache.omid.tso.LeaseManagement;
46  import org.apache.omid.tso.PausableLeaseManager;
47  import org.apache.omid.tso.TSOServer;
48  import org.apache.omid.tso.TSOServerConfig;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  import org.testng.annotations.AfterMethod;
52  import org.testng.annotations.BeforeMethod;
53  import org.testng.annotations.Test;
54  
55  import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
56  import com.google.inject.Guice;
57  import com.google.inject.Injector;
58  
59  @Test(groups = "sharedHBase")
60  public class TestEndToEndScenariosWithHA extends OmidTestBase {
61  
62      private static final int TEST_LEASE_PERIOD_MS = 5_000;
63      private static final String CURRENT_TSO_PATH = "/CURRENT_TSO_PATH";
64      private static final String TSO_LEASE_PATH = "/TSO_LEASE_PATH";
65      private static final String NAMESPACE = "omid";
66  
67      private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndScenariosWithHA.class);
68  
69      private static final byte[] qualifier1 = Bytes.toBytes("test-q1");
70      private static final byte[] qualifier2 = Bytes.toBytes("test-q2l");
71      private static final byte[] row1 = Bytes.toBytes("row1");
72      private static final byte[] row2 = Bytes.toBytes("row2");
73      private static final byte[] initialData = Bytes.toBytes("testWrite-0");
74      private static final byte[] data1_q1 = Bytes.toBytes("testWrite-1-q1");
75      private static final byte[] data1_q2 = Bytes.toBytes("testWrite-1-q2");
76      private static final byte[] data2_q1 = Bytes.toBytes("testWrite-2-q1");
77      private static final byte[] data2_q2 = Bytes.toBytes("testWrite-2-q2");
78      private static final int TSO1_PORT = 2223;
79      private static final int TSO2_PORT = 4321;
80  
81      private CountDownLatch barrierTillTSOAddressPublication;
82  
83      private CuratorFramework zkClient;
84  
85      private TSOServer tso1;
86      private TSOServer tso2;
87  
88      private PausableLeaseManager leaseManager1;
89  
90      private TransactionManager tm;
91  
92      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
93      public void setup() throws Exception {
94          // Get the zkConnection string from minicluster
95          String zkConnection = "localhost:" + hBaseUtils.getZkCluster().getClientPort();
96  
97          zkClient = provideInitializedZookeeperClient(zkConnection);
98  
99          // Synchronize TSO start
100         barrierTillTSOAddressPublication = new CountDownLatch(1);
101         final NodeCache currentTSOZNode = new NodeCache(zkClient, CURRENT_TSO_PATH);
102         currentTSOZNode.getListenable().addListener(new NodeCacheListener() {
103 
104             @Override
105             public void nodeChanged() throws Exception {
106                 byte[] currentTSOAndEpochAsBytes = currentTSOZNode.getCurrentData().getData();
107                 String currentTSOAndEpoch = new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
108                 if (currentTSOAndEpoch.endsWith("#0")) { // Wait till a TSO instance publishes the epoch
109                     barrierTillTSOAddressPublication.countDown();
110                 }
111             }
112 
113         });
114         currentTSOZNode.start(true);
115 
116         // Configure TSO 1
117         TSOServerConfig config1 = new TSOServerConfig();
118         config1.setPort(TSO1_PORT);
119         config1.setConflictMapSize(1000);
120         config1.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
121         Injector injector1 = Guice.createInjector(new TestTSOModule(hbaseConf, config1));
122         LOG.info("===================== Starting TSO 1 =====================");
123         tso1 = injector1.getInstance(TSOServer.class);
124         leaseManager1 = (PausableLeaseManager) injector1.getInstance(LeaseManagement.class);
125         tso1.startAsync();
126         tso1.awaitRunning();
127         TestUtils.waitForSocketListening("localhost", TSO1_PORT, 100);
128         LOG.info("================ Finished loading TSO 1 ==================");
129 
130         // Configure TSO 2
131         TSOServerConfig config2 = new TSOServerConfig();
132         config2.setPort(TSO2_PORT);
133         config2.setConflictMapSize(1000);
134         config2.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
135         Injector injector2 = Guice.createInjector(new TestTSOModule(hbaseConf, config2));
136         LOG.info("===================== Starting TSO 2 =====================");
137         tso2 = injector2.getInstance(TSOServer.class);
138         injector2.getInstance(LeaseManagement.class);
139         tso2.startAsync();
140         tso2.awaitRunning();
141         // Don't do this here: TestUtils.waitForSocketListening("localhost", 4321, 100);
142         LOG.info("================ Finished loading TSO 2 ==================");
143 
144         // Wait till the master TSO is up
145         barrierTillTSOAddressPublication.await();
146         currentTSOZNode.close();
147 
148         // Configure HBase TM
149         LOG.info("===================== Starting TM =====================");
150         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
151         hbaseOmidClientConf.setConnectionType(HA);
152         hbaseOmidClientConf.setConnectionString(zkConnection);
153         hbaseOmidClientConf.getOmidClientConfiguration().setZkCurrentTsoPath(CURRENT_TSO_PATH);
154         hbaseOmidClientConf.getOmidClientConfiguration().setZkNamespace(NAMESPACE);
155         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
156         hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
157         tm = HBaseTransactionManager.builder(hbaseOmidClientConf).build();
158         LOG.info("===================== TM Started =========================");
159     }
160 
161 
162     @AfterMethod(alwaysRun = true, timeOut = 60_000)
163     public void cleanup() throws Exception {
164         LOG.info("Cleanup");
165         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
166         deleteTable(admin, TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME));
167         hBaseUtils.createTable(TableName.valueOf((DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME)),
168                                new byte[][]{DEFAULT_TIMESTAMP_STORAGE_CF_NAME.getBytes()},
169                                Integer.MAX_VALUE);
170         tso1.stopAsync();
171         tso1.awaitTerminated();
172         TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
173         tso2.stopAsync();
174         tso2.awaitTerminated();
175         TestUtils.waitForSocketNotListening("localhost", TSO2_PORT, 100);
176 
177         zkClient.delete().forPath(TSO_LEASE_PATH);
178         LOG.info("ZKPath {} deleted", TSO_LEASE_PATH);
179         zkClient.delete().forPath(CURRENT_TSO_PATH);
180         LOG.info("ZKPaths {} deleted", CURRENT_TSO_PATH);
181 
182         zkClient.close();
183     }
184 
185     //
186     // TSO 1 is MASTER & TSO 2 is BACKUP
187     // Setup: TX 0 -> Add initial data to cells R1C1 (v0) & R2C2 (v0)
188     // TX 1 starts (TSO1)
189     // TX 1 modifies cells R1C1 & R2C2 (v1)
190     // Interleaved Read TX -IR TX- starts (TSO1)
191     // TSO 1 PAUSES -> TSO 2 becomes MASTER
192     // IR TX reads R1C1 -> should get v0
193     // TX 1 tries to commit -> should abort because was started in TSO 1
194     // IR TX reads R2C2 -> should get v0
195     // IR TX tries to commit -> should abort because was started in TSO 1
196     // End of Test state: R1C1 & R2C2 (v0)
197     @Test(timeOut = 60_000)
198     public void testScenario1() throws Exception {
199         try (TTable txTable = new TTable(connection, TEST_TABLE)) {
200 
201             // Write initial values for the test
202             HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
203             long initialEpoch = tx0.getEpoch();
204             LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
205             Put putInitialDataRow1 = new Put(row1);
206             putInitialDataRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1, initialData);
207             txTable.put(tx0, putInitialDataRow1);
208             Put putInitialDataRow2 = new Put(row2);
209             putInitialDataRow2.addColumn(TEST_FAMILY.getBytes(), qualifier2, initialData);
210             txTable.put(tx0, putInitialDataRow2);
211             tm.commit(tx0);
212 
213             // Initial checks
214             checkRowValues(txTable, initialData, initialData);
215 
216             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
217             LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
218                      Bytes.toString(data1_q2));
219             Put putData1R1Q1 = new Put(row1);
220             putData1R1Q1.addColumn(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
221             txTable.put(tx1, putData1R1Q1);
222             Put putData1R2Q2 = new Put(row2);
223             putData1R2Q2.addColumn(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
224             txTable.put(tx1, putData1R2Q2);
225 
226             Transaction interleavedReadTx = tm.begin();
227 
228             LOG.info("Starting Interleaving Read Tx {} for checking cell values", interleavedReadTx.getTransactionId());
229 
230             // Simulate a GC pause to change mastership (should throw a ServiceUnavailable exception)
231             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
232             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
233             LOG.info("++++++++++++++++++++ PAUSING TSO 1 +++++++++++++++++++");
234             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
235             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
236             leaseManager1.pausedInStillInLeasePeriod();
237 
238             // Read interleaved and check the values writen by tx 1
239             Get getRow1 = new Get(row1).setMaxVersions(1);
240             getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
241             Result r = txTable.get(interleavedReadTx, getRow1);
242             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
243                          "Unexpected value for SI read R1Q1" + interleavedReadTx + ": "
244                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
245 
246             // Try to commit, but it should abort due to the change in mastership
247             try {
248                 tm.commit(tx1);
249                 fail();
250             } catch (RollbackException e) {
251                 // Expected
252                 LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
253                 assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
254                 assertEquals(tx1.getEpoch(), initialEpoch);
255             }
256 
257             // Read interleaved and check the values written by tx 1
258             Get getRow2 = new Get(row2).setMaxVersions(1);
259             r = txTable.get(interleavedReadTx, getRow2);
260             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
261                          "Unexpected value for SI read R2Q2" + interleavedReadTx + ": "
262                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
263 
264             // Should commit because its a read only tx does not have to contact the TSO
265             tm.commit(interleavedReadTx);
266             assertEquals(interleavedReadTx.getEpoch(), initialEpoch);
267             assertEquals(interleavedReadTx.getStatus(), Transaction.Status.COMMITTED_RO);
268 
269             LOG.info("Wait till the client is informed about the connection parameters of the new TSO");
270             TestUtils.waitForSocketListening("localhost", TSO2_PORT, 100);
271 
272             checkRowValues(txTable, initialData, initialData);
273 
274             // Need to resume to let other test progress
275             leaseManager1.resume();
276 
277         }
278 
279     }
280 
281     //
282     // TSO 1 is MASTER & TSO 2 is BACKUP
283     // Setup: TX 0 -> Add initial data to cells R1C1 (v0) & R2C2 (v0)
284     // TX 1 starts (TSO1)
285     // TX 1 modifies cells R1C1 & R2C2 (v1)
286     // TSO 1 is KILLED -> TSO 2 becomes MASTER
287     // TX 1 tries to commit -> should abort because was started in TSO 1
288     // TX 2 starts (TSO1)
289     // TX 2 reads R1C1 -> should get v0
290     // TX 2 reads R2C2 -> should get v0
291     // TX 2 modifies cells R1C1 & R2C2 (v2)
292     // TX 2 commits
293     // End of Test state: R1C1 & R2C2 (v2)
294     @Test(timeOut = 60_000)
295     public void testScenario2() throws Exception {
296         try (TTable txTable = new TTable(connection, TEST_TABLE)) {
297 
298             // Write initial values for the test
299             HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
300             long initialEpoch = tx0.getEpoch();
301             LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
302             Put putInitialDataRow1 = new Put(row1);
303             putInitialDataRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1, initialData);
304             txTable.put(tx0, putInitialDataRow1);
305             Put putInitialDataRow2 = new Put(row2);
306             putInitialDataRow2.addColumn(TEST_FAMILY.getBytes(), qualifier2, initialData);
307             txTable.put(tx0, putInitialDataRow2);
308             tm.commit(tx0);
309 
310             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
311             LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
312                      Bytes.toString(data1_q2));
313             Put putData1R1Q1 = new Put(row1);
314             putData1R1Q1.addColumn(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
315             txTable.put(tx1, putData1R1Q1);
316             Put putData1R2Q2 = new Put(row2);
317             putData1R2Q2.addColumn(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
318             txTable.put(tx1, putData1R2Q2);
319 
320             // Provoke change in mastership (should throw a Connection exception)
321             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
322             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
323             LOG.info("++++++++++++++++++++ KILLING TSO 1 +++++++++++++++++++");
324             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
325             LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
326             tso1.stopAsync();
327             tso1.awaitTerminated();
328             TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
329 
330             // Try to commit, but it should abort due to the change in mastership
331             try {
332                 tm.commit(tx1);
333                 String failMsg = String.format("%s should not commit. Initial epoch was: %d", tx1, initialEpoch);
334                 fail(failMsg);
335             } catch (RollbackException e) {
336                 // Expected
337                 LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
338                 assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
339                 assertEquals(tx1.getEpoch(), initialEpoch);
340             }
341 
342             LOG.info("Sleep some time till the client is informed about"
343                              + "the new TSO connection parameters and how can connect");
344             TimeUnit.SECONDS.sleep(10 + 2);
345 
346             HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
347             LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx2, Bytes.toString(data1_q1),
348                      Bytes.toString(data1_q2));
349             Get getData1R1Q1 = new Get(row1).setMaxVersions(1);
350             Result r = txTable.get(tx2, getData1R1Q1);
351             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
352                          "Unexpected value for SI read R1Q1" + tx2 + ": "
353                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
354             Get getData1R2Q2 = new Get(row2).setMaxVersions(1);
355             r = txTable.get(tx2, getData1R2Q2);
356             assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
357                          "Unexpected value for SI read R1Q1" + tx2 + ": "
358                                  + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
359 
360             Put putData2R1Q1 = new Put(row1);
361             putData2R1Q1.addColumn(TEST_FAMILY.getBytes(), qualifier1, data2_q1);
362             txTable.put(tx2, putData2R1Q1);
363             Put putData2R2Q2 = new Put(row2);
364             putData2R2Q2.addColumn(TEST_FAMILY.getBytes(), qualifier2, data2_q2);
365             txTable.put(tx2, putData2R2Q2);
366             // This one should commit in the new TSO
367             tm.commit(tx2);
368 
369             assertEquals(tx2.getStatus(), Transaction.Status.COMMITTED);
370             assertTrue(tx2.getEpoch() > tx0.getCommitTimestamp());
371 
372             checkRowValues(txTable, data2_q1, data2_q2);
373         }
374 
375     }
376 
377     private void checkRowValues(TTable txTable, byte[] expectedDataR1Q1, byte[] expectedDataR2Q2)
378             throws IOException, RollbackException {
379         Transaction readTx = tm.begin();
380         LOG.info("Starting Read Tx {} for checking cell values", readTx.getTransactionId());
381         Get getRow1 = new Get(row1).setMaxVersions(1);
382         getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
383         Result r = txTable.get(readTx, getRow1);
384         assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), expectedDataR1Q1,
385                      "Unexpected value for SI read R1Q1" + readTx + ": " + Bytes
386                              .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
387         Get getRow2 = new Get(row2).setMaxVersions(1);
388         r = txTable.get(readTx, getRow2);
389         assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), expectedDataR2Q2,
390                      "Unexpected value for SI read R2Q2" + readTx + ": " + Bytes
391                              .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
392         tm.commit(readTx);
393     }
394 
395     // ----------------------------------------------------------------------------------------------------------------
396     // Helpers
397     // ----------------------------------------------------------------------------------------------------------------
398 
399     private static CuratorFramework provideInitializedZookeeperClient(String zkConnection) throws Exception {
400 
401         LOG.info("Creating Zookeeper Client connecting to {}", zkConnection);
402 
403         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
404         CuratorFramework zkClient = CuratorFrameworkFactory
405                 .builder()
406                 .namespace(NAMESPACE)
407                 .connectString(zkConnection)
408                 .retryPolicy(retryPolicy).build();
409 
410         LOG.info("Connecting to ZK cluster {}", zkClient.getState());
411         zkClient.start();
412         zkClient.blockUntilConnected();
413         LOG.info("Connection to ZK cluster {}", zkClient.getState());
414 
415         return zkClient;
416     }
417 
418 }