1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.committable.hbase;
19
20 import static org.testng.Assert.assertEquals;
21 import static org.testng.Assert.assertFalse;
22 import static org.testng.Assert.assertTrue;
23
24 import java.util.concurrent.Future;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HBaseConfiguration;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.HColumnDescriptor;
30 import org.apache.hadoop.hbase.HTableDescriptor;
31 import org.apache.hadoop.hbase.MiniHBaseCluster;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.Admin;
34 import org.apache.hadoop.hbase.client.Connection;
35 import org.apache.hadoop.hbase.client.ConnectionFactory;
36 import org.apache.hadoop.hbase.client.ResultScanner;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.omid.committable.CommitTable;
40 import org.apache.omid.committable.CommitTable.Client;
41 import org.apache.omid.committable.CommitTable.CommitTimestamp;
42 import org.apache.omid.committable.CommitTable.Writer;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.testng.annotations.AfterClass;
46 import org.testng.annotations.AfterMethod;
47 import org.testng.annotations.BeforeClass;
48 import org.testng.annotations.BeforeMethod;
49 import org.testng.annotations.Test;
50
51 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
52 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
53
54 public class TestHBaseCommitTable {
55
56 private static final Logger LOG = LoggerFactory.getLogger(TestHBaseCommitTable.class);
57
58 private static final String TEST_TABLE = "TEST";
59
60 private static final TableName TABLE_NAME = TableName.valueOf(TEST_TABLE);
61
62 private static HBaseTestingUtility testutil;
63 private static MiniHBaseCluster hbasecluster;
64 protected static Configuration hbaseConf;
65 protected static Connection connection;
66 private byte[] commitTableFamily;
67 private byte[] lowWatermarkFamily;
68
69
70 @BeforeClass
71 public void setUpClass() throws Exception {
72
73 hbaseConf = HBaseConfiguration.create();
74 hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
75 DefaultHBaseCommitTableStorageModule module = new DefaultHBaseCommitTableStorageModule();
76 commitTableFamily = module.getFamilyName().getBytes();
77 lowWatermarkFamily = module.getLowWatermarkFamily().getBytes();
78 LOG.info("Create hbase");
79 testutil = new HBaseTestingUtility(hbaseConf);
80 hbasecluster = testutil.startMiniCluster(1);
81 connection = ConnectionFactory.createConnection(hbaseConf);
82 }
83
84 @AfterClass
85 public void tearDownClass() throws Exception {
86 if (hbasecluster != null) {
87 testutil.shutdownMiniCluster();
88 }
89 }
90
91 @BeforeMethod
92 public void setUp() throws Exception {
93 Admin admin = testutil.getHBaseAdmin();
94
95 if (!admin.tableExists(TableName.valueOf(TEST_TABLE))) {
96 HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
97
98 HColumnDescriptor datafam = new HColumnDescriptor(commitTableFamily);
99 datafam.setMaxVersions(Integer.MAX_VALUE);
100 desc.addFamily(datafam);
101
102 HColumnDescriptor lowWatermarkFam = new HColumnDescriptor(lowWatermarkFamily);
103 lowWatermarkFam.setMaxVersions(Integer.MAX_VALUE);
104 desc.addFamily(lowWatermarkFam);
105
106
107
108 admin.createTable(desc);
109 }
110
111 if (admin.isTableDisabled(TableName.valueOf(TEST_TABLE))) {
112 admin.enableTable(TableName.valueOf(TEST_TABLE));
113 }
114 HTableDescriptor[] tables = admin.listTables();
115 for (HTableDescriptor t : tables) {
116 LOG.info(t.getNameAsString());
117 }
118 }
119
120 @AfterMethod
121 public void tearDown() {
122 try {
123 LOG.info("tearing Down");
124 Admin admin = testutil.getHBaseAdmin();
125 admin.disableTable(TableName.valueOf(TEST_TABLE));
126 admin.deleteTable(TableName.valueOf(TEST_TABLE));
127
128 } catch (Exception e) {
129 LOG.error("Error tearing down", e);
130 }
131 }
132
133 @Test(timeOut = 30_000)
134 public void testBasicBehaviour() throws Throwable {
135 HBaseCommitTableConfig config = new HBaseCommitTableConfig();
136 config.setTableName(TEST_TABLE);
137 HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
138
139 Writer writer = commitTable.getWriter();
140 Client client = commitTable.getClient();
141
142
143 assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
144
145
146 for (int i = 0; i < 1000; i+=CommitTable.MAX_CHECKPOINTS_PER_TXN) {
147 writer.addCommittedTransaction(i, i + 1);
148 }
149 writer.flush();
150 assertEquals(rowCount(TABLE_NAME, commitTableFamily), 1000/CommitTable.MAX_CHECKPOINTS_PER_TXN, "Rows should be 1000!");
151
152
153 for (long i = 0; i < 1000; i++) {
154 Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(i).get();
155 assertTrue(commitTimestamp.isPresent());
156 assertTrue(commitTimestamp.get().isValid());
157 long ct = commitTimestamp.get().getValue();
158 long expected = i - (i % CommitTable.MAX_CHECKPOINTS_PER_TXN) + 1;
159 assertEquals(ct, expected, "Commit timestamp should be " + expected);
160 }
161 assertEquals(rowCount(TABLE_NAME, commitTableFamily), 1000/CommitTable.MAX_CHECKPOINTS_PER_TXN, "Rows should be 1000!");
162
163
164 Future<Void> f;
165 for (long i = 0; i < 1000; i+=CommitTable.MAX_CHECKPOINTS_PER_TXN) {
166 f = client.deleteCommitEntry(i);
167 f.get();
168 }
169 assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
170
171
172 Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(0).get();
173 assertFalse(commitTimestamp.isPresent(), "Commit timestamp should not be present");
174
175
176 assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 0, "Rows should be 0!");
177
178
179 ListenableFuture<Long> lowWatermarkFuture = client.readLowWatermark();
180 assertEquals(lowWatermarkFuture.get(), Long.valueOf(0), "Low watermark should be 0");
181
182
183 for (int lowWatermark = 0; lowWatermark < 1000; lowWatermark++) {
184 writer.updateLowWatermark(lowWatermark);
185 }
186 writer.flush();
187 assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 1, "Should there be only one row!");
188
189
190 lowWatermarkFuture = client.readLowWatermark();
191 long lowWatermark = lowWatermarkFuture.get();
192 assertEquals(lowWatermark, 999, "Low watermark should be 999");
193 assertEquals(rowCount(TABLE_NAME, lowWatermarkFamily), 1, "Should there be only one row!");
194
195 }
196
197
198 @Test(timeOut = 30_000)
199 public void testCheckpoints() throws Throwable {
200 HBaseCommitTableConfig config = new HBaseCommitTableConfig();
201 config.setTableName(TEST_TABLE);
202 HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
203
204 Writer writer = commitTable.getWriter();
205 Client client = commitTable.getClient();
206
207
208 assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
209
210 long st = 0;
211 long ct = 1;
212
213
214 writer.addCommittedTransaction(st, ct);
215 writer.flush();
216
217 for (int i=0; i<CommitTable.MAX_CHECKPOINTS_PER_TXN;++i) {
218 Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(i).get();
219 assertTrue(commitTimestamp.isPresent());
220 assertTrue(commitTimestamp.get().isValid());
221 assertEquals(ct, commitTimestamp.get().getValue());
222 }
223
224
225 assertFalse(client.tryInvalidateTransaction(st + 1).get());
226
227 long st2 = 100;
228 long ct2 = 101;
229
230
231 assertTrue(client.tryInvalidateTransaction(st2 + 1).get());
232 assertFalse(writer.atomicAddCommittedTransaction(st2, ct2));
233
234
235 client.deleteCommitEntry(st2 + 1).get();
236
237 assertTrue(writer.atomicAddCommittedTransaction(st2, ct2));
238 }
239
240
241
242 @Test(timeOut = 30_000)
243 public void testTransactionInvalidation() throws Throwable {
244
245
246 final int TX1_ST = 0;
247 final int TX1_CT = TX1_ST + 1;
248 final int TX2_ST = 0 + CommitTable.MAX_CHECKPOINTS_PER_TXN;
249 final int TX2_CT = TX2_ST + 1;
250
251 HBaseCommitTableConfig config = new HBaseCommitTableConfig();
252 config.setTableName(TEST_TABLE);
253 HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
254
255
256 Writer writer = commitTable.getWriter();
257 Client client = commitTable.getClient();
258
259
260 assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
261
262
263 writer.addCommittedTransaction(TX1_ST, TX1_CT);
264 writer.flush();
265 Optional<CommitTimestamp> commitTimestamp = client.getCommitTimestamp(TX1_ST).get();
266 assertTrue(commitTimestamp.isPresent());
267 assertTrue(commitTimestamp.get().isValid());
268 long ct = commitTimestamp.get().getValue();
269 assertEquals(ct, TX1_CT, "Commit timestamp should be " + TX1_CT);
270
271
272
273 boolean wasInvalidated = client.tryInvalidateTransaction(TX1_ST).get();
274 assertFalse(wasInvalidated, "Transaction should not be invalidated");
275
276 commitTimestamp = client.getCommitTimestamp(TX1_ST).get();
277 assertTrue(commitTimestamp.isPresent());
278 assertTrue(commitTimestamp.get().isValid());
279 ct = commitTimestamp.get().getValue();
280 assertEquals(ct, TX1_CT, "Commit timestamp should be " + TX1_CT);
281
282
283 wasInvalidated = client.tryInvalidateTransaction(TX2_ST).get();
284 assertTrue(wasInvalidated, "Transaction should be invalidated");
285 commitTimestamp = client.getCommitTimestamp(TX2_ST).get();
286 assertTrue(commitTimestamp.isPresent());
287 assertFalse(commitTimestamp.get().isValid());
288 ct = commitTimestamp.get().getValue();
289 assertEquals(ct, CommitTable.INVALID_TRANSACTION_MARKER,
290 "Commit timestamp should be " + CommitTable.INVALID_TRANSACTION_MARKER);
291
292
293 writer.addCommittedTransaction(TX2_ST, TX2_CT);
294 writer.flush();
295 commitTimestamp = client.getCommitTimestamp(TX2_ST).get();
296 assertTrue(commitTimestamp.isPresent());
297 assertFalse(commitTimestamp.get().isValid());
298 ct = commitTimestamp.get().getValue();
299 assertEquals(ct, CommitTable.INVALID_TRANSACTION_MARKER,
300 "Commit timestamp should be " + CommitTable.INVALID_TRANSACTION_MARKER);
301
302
303
304 assertEquals(rowCount(TABLE_NAME, commitTableFamily), 2, "Rows should be 2!");
305
306 }
307
308 private static long rowCount(TableName tableName, byte[] family) throws Throwable {
309 Scan scan = new Scan();
310 scan.addFamily(family);
311 Table table = connection.getTable(tableName);
312 try (ResultScanner scanner = table.getScanner(scan)) {
313 int count = 0;
314 while (scanner.next() != null) {
315 count++;
316 }
317 return count;
318 }
319 }
320
321 }