1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.mockito.Matchers.anyLong;
21 import static org.mockito.Matchers.anySetOf;
22 import static org.mockito.Mockito.doReturn;
23 import static org.mockito.Mockito.spy;
24 import static org.mockito.Mockito.timeout;
25 import static org.mockito.Mockito.times;
26 import static org.mockito.Mockito.verify;
27 import static org.testng.Assert.assertEquals;
28 import static org.testng.Assert.assertTrue;
29
30 import org.apache.hadoop.hbase.client.Get;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.omid.committable.CommitTable;
35 import org.apache.omid.tso.client.TSOClient;
36 import org.testng.ITestContext;
37 import org.testng.annotations.Test;
38
39
40 @Test(groups = "sharedHBase")
41 public class TestHBaseTransactionManager extends OmidTestBase {
42
43 private static final int FAKE_EPOCH_INCREMENT = 100;
44
45 private final byte[] row1 = Bytes.toBytes(TestHBaseTransactionManager.class.getCanonicalName());
46 private final byte[] testFamily = Bytes.toBytes(TEST_FAMILY);
47 private final byte[] qualifier = Bytes.toBytes("TEST_Q");
48 private final byte[] data1 = Bytes.toBytes("test_data1");
49
50
51 @Test(timeOut = 20_000)
52 public void testTxManagerGetsTimestampsInTheRightEpoch(ITestContext context) throws Exception {
53
54 TSOClient tsoClient = spy(getClient(context));
55
56 long fakeEpoch = tsoClient.getNewStartTimestamp().get() + (FAKE_EPOCH_INCREMENT * CommitTable.MAX_CHECKPOINTS_PER_TXN);
57
58
59 doReturn(fakeEpoch).when(tsoClient).getEpoch();
60
61 AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, tsoClient));
62
63
64 Transaction tx1 = tm.begin();
65 assertEquals(tx1.getTransactionId(), fakeEpoch);
66 verify(tsoClient, timeout(100).times(FAKE_EPOCH_INCREMENT)).getEpoch();
67
68 }
69
70 @Test(timeOut = 20_000)
71 public void testReadOnlyTransactionsDoNotContactTSOServer(ITestContext context) throws Exception {
72
73 final int EXPECTED_INVOCATIONS_FOR_COMMIT = 1;
74
75 TSOClient tsoClient = spy(getClient(context));
76 TransactionManager tm = newTransactionManager(context, tsoClient);
77
78 try (TTable txTable = new TTable(connection, TEST_TABLE)) {
79
80
81 Transaction tx1 = tm.begin();
82 Put put = new Put(row1);
83 put.addColumn(testFamily, qualifier, data1);
84 txTable.put(tx1, put);
85 tm.commit(tx1);
86
87 verify(tsoClient, times(EXPECTED_INVOCATIONS_FOR_COMMIT)).commit(anyLong(), anySetOf(HBaseCellId.class), anySetOf(HBaseCellId.class));
88
89
90 AbstractTransaction readOnlyTx = (AbstractTransaction) tm.begin();
91 Get get = new Get(row1);
92 Result r = txTable.get(readOnlyTx, get);
93 assertTrue(Bytes.equals(r.getValue(testFamily, qualifier), data1), "Wrong value for RO-TX " + readOnlyTx);
94 assertTrue(readOnlyTx.getWriteSet().isEmpty());
95 tm.commit(readOnlyTx);
96
97 verify(tsoClient, times(EXPECTED_INVOCATIONS_FOR_COMMIT)).commit(anyLong(), anySetOf(HBaseCellId.class), anySetOf(HBaseCellId.class));
98 assertEquals(readOnlyTx.getStatus(), Transaction.Status.COMMITTED_RO);
99 }
100
101 }
102
103 }