1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
21 import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_CF_NAME;
22 import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME;
23 import static org.apache.omid.tso.client.OmidClientConfiguration.ConnType.HA;
24 import static org.testng.Assert.assertEquals;
25 import static org.testng.Assert.assertTrue;
26 import static org.testng.Assert.fail;
27
28 import java.io.IOException;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.curator.RetryPolicy;
33 import org.apache.curator.framework.CuratorFramework;
34 import org.apache.curator.framework.CuratorFrameworkFactory;
35 import org.apache.curator.framework.recipes.cache.NodeCache;
36 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
37 import org.apache.curator.retry.ExponentialBackoffRetry;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.Get;
40 import org.apache.hadoop.hbase.client.HBaseAdmin;
41 import org.apache.hadoop.hbase.client.Put;
42 import org.apache.hadoop.hbase.client.Result;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.omid.TestUtils;
45 import org.apache.omid.tso.LeaseManagement;
46 import org.apache.omid.tso.PausableLeaseManager;
47 import org.apache.omid.tso.TSOServer;
48 import org.apache.omid.tso.TSOServerConfig;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import org.testng.annotations.AfterMethod;
52 import org.testng.annotations.BeforeMethod;
53 import org.testng.annotations.Test;
54
55 import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
56 import com.google.inject.Guice;
57 import com.google.inject.Injector;
58
59 @Test(groups = "sharedHBase")
60 public class TestEndToEndScenariosWithHA extends OmidTestBase {
61
62 private static final int TEST_LEASE_PERIOD_MS = 5_000;
63 private static final String CURRENT_TSO_PATH = "/CURRENT_TSO_PATH";
64 private static final String TSO_LEASE_PATH = "/TSO_LEASE_PATH";
65 private static final String NAMESPACE = "omid";
66
67 private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndScenariosWithHA.class);
68
69 private static final byte[] qualifier1 = Bytes.toBytes("test-q1");
70 private static final byte[] qualifier2 = Bytes.toBytes("test-q2l");
71 private static final byte[] row1 = Bytes.toBytes("row1");
72 private static final byte[] row2 = Bytes.toBytes("row2");
73 private static final byte[] initialData = Bytes.toBytes("testWrite-0");
74 private static final byte[] data1_q1 = Bytes.toBytes("testWrite-1-q1");
75 private static final byte[] data1_q2 = Bytes.toBytes("testWrite-1-q2");
76 private static final byte[] data2_q1 = Bytes.toBytes("testWrite-2-q1");
77 private static final byte[] data2_q2 = Bytes.toBytes("testWrite-2-q2");
78 private static final int TSO1_PORT = 2223;
79 private static final int TSO2_PORT = 4321;
80
81 private CountDownLatch barrierTillTSOAddressPublication;
82
83 private CuratorFramework zkClient;
84
85 private TSOServer tso1;
86 private TSOServer tso2;
87
88 private PausableLeaseManager leaseManager1;
89
90 private TransactionManager tm;
91
92 @BeforeMethod(alwaysRun = true, timeOut = 30_000)
93 public void setup() throws Exception {
94
95 String zkConnection = "localhost:" + hBaseUtils.getZkCluster().getClientPort();
96
97 zkClient = provideInitializedZookeeperClient(zkConnection);
98
99
100 barrierTillTSOAddressPublication = new CountDownLatch(1);
101 final NodeCache currentTSOZNode = new NodeCache(zkClient, CURRENT_TSO_PATH);
102 currentTSOZNode.getListenable().addListener(new NodeCacheListener() {
103
104 @Override
105 public void nodeChanged() throws Exception {
106 byte[] currentTSOAndEpochAsBytes = currentTSOZNode.getCurrentData().getData();
107 String currentTSOAndEpoch = new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
108 if (currentTSOAndEpoch.endsWith("#0")) {
109 barrierTillTSOAddressPublication.countDown();
110 }
111 }
112
113 });
114 currentTSOZNode.start(true);
115
116
117 TSOServerConfig config1 = new TSOServerConfig();
118 config1.setPort(TSO1_PORT);
119 config1.setConflictMapSize(1000);
120 config1.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
121 Injector injector1 = Guice.createInjector(new TestTSOModule(hbaseConf, config1));
122 LOG.info("===================== Starting TSO 1 =====================");
123 tso1 = injector1.getInstance(TSOServer.class);
124 leaseManager1 = (PausableLeaseManager) injector1.getInstance(LeaseManagement.class);
125 tso1.startAsync();
126 tso1.awaitRunning();
127 TestUtils.waitForSocketListening("localhost", TSO1_PORT, 100);
128 LOG.info("================ Finished loading TSO 1 ==================");
129
130
131 TSOServerConfig config2 = new TSOServerConfig();
132 config2.setPort(TSO2_PORT);
133 config2.setConflictMapSize(1000);
134 config2.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
135 Injector injector2 = Guice.createInjector(new TestTSOModule(hbaseConf, config2));
136 LOG.info("===================== Starting TSO 2 =====================");
137 tso2 = injector2.getInstance(TSOServer.class);
138 injector2.getInstance(LeaseManagement.class);
139 tso2.startAsync();
140 tso2.awaitRunning();
141
142 LOG.info("================ Finished loading TSO 2 ==================");
143
144
145 barrierTillTSOAddressPublication.await();
146 currentTSOZNode.close();
147
148
149 LOG.info("===================== Starting TM =====================");
150 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
151 hbaseOmidClientConf.setConnectionType(HA);
152 hbaseOmidClientConf.setConnectionString(zkConnection);
153 hbaseOmidClientConf.getOmidClientConfiguration().setZkCurrentTsoPath(CURRENT_TSO_PATH);
154 hbaseOmidClientConf.getOmidClientConfiguration().setZkNamespace(NAMESPACE);
155 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
156 hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
157 tm = HBaseTransactionManager.builder(hbaseOmidClientConf).build();
158 LOG.info("===================== TM Started =========================");
159 }
160
161
162 @AfterMethod(alwaysRun = true, timeOut = 60_000)
163 public void cleanup() throws Exception {
164 LOG.info("Cleanup");
165 HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
166 deleteTable(admin, TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME));
167 hBaseUtils.createTable(TableName.valueOf((DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME)),
168 new byte[][]{DEFAULT_TIMESTAMP_STORAGE_CF_NAME.getBytes()},
169 Integer.MAX_VALUE);
170 tso1.stopAsync();
171 tso1.awaitTerminated();
172 TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
173 tso2.stopAsync();
174 tso2.awaitTerminated();
175 TestUtils.waitForSocketNotListening("localhost", TSO2_PORT, 100);
176
177 zkClient.delete().forPath(TSO_LEASE_PATH);
178 LOG.info("ZKPath {} deleted", TSO_LEASE_PATH);
179 zkClient.delete().forPath(CURRENT_TSO_PATH);
180 LOG.info("ZKPaths {} deleted", CURRENT_TSO_PATH);
181
182 zkClient.close();
183 }
184
185
186
187
188
189
190
191
192
193
194
195
196
197 @Test(timeOut = 60_000)
198 public void testScenario1() throws Exception {
199 try (TTable txTable = new TTable(connection, TEST_TABLE)) {
200
201
202 HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
203 long initialEpoch = tx0.getEpoch();
204 LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
205 Put putInitialDataRow1 = new Put(row1);
206 putInitialDataRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1, initialData);
207 txTable.put(tx0, putInitialDataRow1);
208 Put putInitialDataRow2 = new Put(row2);
209 putInitialDataRow2.addColumn(TEST_FAMILY.getBytes(), qualifier2, initialData);
210 txTable.put(tx0, putInitialDataRow2);
211 tm.commit(tx0);
212
213
214 checkRowValues(txTable, initialData, initialData);
215
216 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
217 LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
218 Bytes.toString(data1_q2));
219 Put putData1R1Q1 = new Put(row1);
220 putData1R1Q1.addColumn(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
221 txTable.put(tx1, putData1R1Q1);
222 Put putData1R2Q2 = new Put(row2);
223 putData1R2Q2.addColumn(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
224 txTable.put(tx1, putData1R2Q2);
225
226 Transaction interleavedReadTx = tm.begin();
227
228 LOG.info("Starting Interleaving Read Tx {} for checking cell values", interleavedReadTx.getTransactionId());
229
230
231 LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
232 LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
233 LOG.info("++++++++++++++++++++ PAUSING TSO 1 +++++++++++++++++++");
234 LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
235 LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
236 leaseManager1.pausedInStillInLeasePeriod();
237
238
239 Get getRow1 = new Get(row1).setMaxVersions(1);
240 getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
241 Result r = txTable.get(interleavedReadTx, getRow1);
242 assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
243 "Unexpected value for SI read R1Q1" + interleavedReadTx + ": "
244 + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
245
246
247 try {
248 tm.commit(tx1);
249 fail();
250 } catch (RollbackException e) {
251
252 LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
253 assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
254 assertEquals(tx1.getEpoch(), initialEpoch);
255 }
256
257
258 Get getRow2 = new Get(row2).setMaxVersions(1);
259 r = txTable.get(interleavedReadTx, getRow2);
260 assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
261 "Unexpected value for SI read R2Q2" + interleavedReadTx + ": "
262 + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
263
264
265 tm.commit(interleavedReadTx);
266 assertEquals(interleavedReadTx.getEpoch(), initialEpoch);
267 assertEquals(interleavedReadTx.getStatus(), Transaction.Status.COMMITTED_RO);
268
269 LOG.info("Wait till the client is informed about the connection parameters of the new TSO");
270 TestUtils.waitForSocketListening("localhost", TSO2_PORT, 100);
271
272 checkRowValues(txTable, initialData, initialData);
273
274
275 leaseManager1.resume();
276
277 }
278
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294 @Test(timeOut = 60_000)
295 public void testScenario2() throws Exception {
296 try (TTable txTable = new TTable(connection, TEST_TABLE)) {
297
298
299 HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
300 long initialEpoch = tx0.getEpoch();
301 LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
302 Put putInitialDataRow1 = new Put(row1);
303 putInitialDataRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1, initialData);
304 txTable.put(tx0, putInitialDataRow1);
305 Put putInitialDataRow2 = new Put(row2);
306 putInitialDataRow2.addColumn(TEST_FAMILY.getBytes(), qualifier2, initialData);
307 txTable.put(tx0, putInitialDataRow2);
308 tm.commit(tx0);
309
310 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
311 LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
312 Bytes.toString(data1_q2));
313 Put putData1R1Q1 = new Put(row1);
314 putData1R1Q1.addColumn(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
315 txTable.put(tx1, putData1R1Q1);
316 Put putData1R2Q2 = new Put(row2);
317 putData1R2Q2.addColumn(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
318 txTable.put(tx1, putData1R2Q2);
319
320
321 LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
322 LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
323 LOG.info("++++++++++++++++++++ KILLING TSO 1 +++++++++++++++++++");
324 LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
325 LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
326 tso1.stopAsync();
327 tso1.awaitTerminated();
328 TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
329
330
331 try {
332 tm.commit(tx1);
333 String failMsg = String.format("%s should not commit. Initial epoch was: %d", tx1, initialEpoch);
334 fail(failMsg);
335 } catch (RollbackException e) {
336
337 LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
338 assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
339 assertEquals(tx1.getEpoch(), initialEpoch);
340 }
341
342 LOG.info("Sleep some time till the client is informed about"
343 + "the new TSO connection parameters and how can connect");
344 TimeUnit.SECONDS.sleep(10 + 2);
345
346 HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
347 LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx2, Bytes.toString(data1_q1),
348 Bytes.toString(data1_q2));
349 Get getData1R1Q1 = new Get(row1).setMaxVersions(1);
350 Result r = txTable.get(tx2, getData1R1Q1);
351 assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
352 "Unexpected value for SI read R1Q1" + tx2 + ": "
353 + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
354 Get getData1R2Q2 = new Get(row2).setMaxVersions(1);
355 r = txTable.get(tx2, getData1R2Q2);
356 assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
357 "Unexpected value for SI read R1Q1" + tx2 + ": "
358 + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
359
360 Put putData2R1Q1 = new Put(row1);
361 putData2R1Q1.addColumn(TEST_FAMILY.getBytes(), qualifier1, data2_q1);
362 txTable.put(tx2, putData2R1Q1);
363 Put putData2R2Q2 = new Put(row2);
364 putData2R2Q2.addColumn(TEST_FAMILY.getBytes(), qualifier2, data2_q2);
365 txTable.put(tx2, putData2R2Q2);
366
367 tm.commit(tx2);
368
369 assertEquals(tx2.getStatus(), Transaction.Status.COMMITTED);
370 assertTrue(tx2.getEpoch() > tx0.getCommitTimestamp());
371
372 checkRowValues(txTable, data2_q1, data2_q2);
373 }
374
375 }
376
377 private void checkRowValues(TTable txTable, byte[] expectedDataR1Q1, byte[] expectedDataR2Q2)
378 throws IOException, RollbackException {
379 Transaction readTx = tm.begin();
380 LOG.info("Starting Read Tx {} for checking cell values", readTx.getTransactionId());
381 Get getRow1 = new Get(row1).setMaxVersions(1);
382 getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
383 Result r = txTable.get(readTx, getRow1);
384 assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), expectedDataR1Q1,
385 "Unexpected value for SI read R1Q1" + readTx + ": " + Bytes
386 .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
387 Get getRow2 = new Get(row2).setMaxVersions(1);
388 r = txTable.get(readTx, getRow2);
389 assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), expectedDataR2Q2,
390 "Unexpected value for SI read R2Q2" + readTx + ": " + Bytes
391 .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
392 tm.commit(readTx);
393 }
394
395
396
397
398
399 private static CuratorFramework provideInitializedZookeeperClient(String zkConnection) throws Exception {
400
401 LOG.info("Creating Zookeeper Client connecting to {}", zkConnection);
402
403 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
404 CuratorFramework zkClient = CuratorFrameworkFactory
405 .builder()
406 .namespace(NAMESPACE)
407 .connectString(zkConnection)
408 .retryPolicy(retryPolicy).build();
409
410 LOG.info("Connecting to ZK cluster {}", zkClient.getState());
411 zkClient.start();
412 zkClient.blockUntilConnected();
413 LOG.info("Connection to ZK cluster {}", zkClient.getState());
414
415 return zkClient;
416 }
417
418 }