1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.apache.omid.transaction.CellUtils.hasCell;
21 import static org.apache.omid.transaction.CellUtils.hasShadowCell;
22 import static org.junit.Assert.assertEquals;
23 import static org.mockito.Matchers.any;
24 import static org.mockito.Mockito.doAnswer;
25 import static org.mockito.Mockito.spy;
26 import static org.testng.Assert.assertTrue;
27
28 import java.io.IOException;
29 import java.util.List;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
34 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.Delete;
39 import org.apache.hadoop.hbase.client.Get;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.Table;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.omid.committable.CommitTable;
45 import org.apache.omid.metrics.NullMetricsProvider;
46 import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
47 import org.junit.Assert;
48 import org.mockito.invocation.InvocationOnMock;
49 import org.mockito.stubbing.Answer;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.testng.ITestContext;
53 import org.testng.annotations.Test;
54
55 @Test(groups = "sharedHBase")
56 public class TestCheckpoint extends OmidTestBase {
57
58 private static final Logger LOG = LoggerFactory.getLogger(TestCheckpoint.class);
59
60 private HBaseTransaction enforceHBaseTransactionAsParam(Transaction tx) {
61 if (tx instanceof HBaseTransaction) {
62 return (HBaseTransaction) tx;
63 } else {
64 throw new IllegalArgumentException(
65 String.format("The transaction object passed %s is not an instance of HBaseTransaction",
66 tx.getClass().getName()));
67 }
68 }
69
70 @Test(timeOut = 30_000)
71 public void testFewCheckPoints(ITestContext context) throws Exception {
72
73 TransactionManager tm = newTransactionManager(context);
74 TTable tt = new TTable(connection, TEST_TABLE);
75
76 byte[] rowName1 = Bytes.toBytes("row1");
77 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
78 byte[] colName1 = Bytes.toBytes("col1");
79 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
80 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
81 byte[] dataValue3 = Bytes.toBytes("testWrite-3");
82
83 Transaction tx1 = tm.begin();
84
85 HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
86
87 Put row1 = new Put(rowName1);
88 row1.addColumn(famName1, colName1, dataValue1);
89 tt.put(tx1, row1);
90
91 Get g = new Get(rowName1).setMaxVersions(1);
92
93 Result r = tt.get(tx1, g);
94 assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
95 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
96
97 hbaseTx1.checkpoint();
98
99 row1 = new Put(rowName1);
100 row1.addColumn(famName1, colName1, dataValue2);
101 tt.put(tx1, row1);
102
103 r = tt.get(tx1, g);
104 assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
105 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
106
107 hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT);
108
109 r = tt.get(tx1, g);
110 assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)),
111 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
112
113 hbaseTx1.checkpoint();
114
115 row1 = new Put(rowName1);
116 row1.addColumn(famName1, colName1, dataValue3);
117 tt.put(tx1, row1);
118
119 r = tt.get(tx1, g);
120 assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)),
121 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
122
123 hbaseTx1.checkpoint();
124
125 r = tt.get(tx1, g);
126 assertTrue(Bytes.equals(dataValue3, r.getValue(famName1, colName1)),
127 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
128
129 hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_ALL);
130
131 r = tt.get(tx1, g);
132
133 assertTrue(r.size() == 3, "Expected 3 results and found " + r.size());
134
135 List<Cell> cells = r.getColumnCells(famName1, colName1);
136 assertTrue(Bytes.equals(dataValue3, CellUtil.cloneValue(cells.get(0))),
137 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
138
139 assertTrue(Bytes.equals(dataValue2, CellUtil.cloneValue(cells.get(1))),
140 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
141
142 assertTrue(Bytes.equals(dataValue1, CellUtil.cloneValue(cells.get(2))),
143 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
144
145 tt.close();
146 }
147
148 @Test(timeOut = 30_000)
149 public void testSNAPSHOT(ITestContext context) throws Exception {
150 TransactionManager tm = newTransactionManager(context);
151 TTable tt = new TTable(connection, TEST_TABLE);
152
153 byte[] rowName1 = Bytes.toBytes("row1");
154 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
155 byte[] colName1 = Bytes.toBytes("col1");
156 byte[] dataValue0 = Bytes.toBytes("testWrite-0");
157 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
158 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
159
160 Transaction tx1 = tm.begin();
161
162 Put row1 = new Put(rowName1);
163 row1.addColumn(famName1, colName1, dataValue0);
164 tt.put(tx1, row1);
165
166 tm.commit(tx1);
167
168 tx1 = tm.begin();
169
170 HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
171
172 Get g = new Get(rowName1).setMaxVersions(1);
173
174 Result r = tt.get(tx1, g);
175 assertTrue(Bytes.equals(dataValue0, r.getValue(famName1, colName1)),
176 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
177
178 row1 = new Put(rowName1);
179 row1.addColumn(famName1, colName1, dataValue1);
180 tt.put(tx1, row1);
181
182
183 r = tt.get(tx1, g);
184 assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
185 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
186
187 hbaseTx1.checkpoint();
188
189 row1 = new Put(rowName1);
190 row1.addColumn(famName1, colName1, dataValue2);
191 tt.put(tx1, row1);
192
193 r = tt.get(tx1, g);
194 assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
195 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
196
197 hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT);
198
199 r = tt.get(tx1, g);
200 assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)),
201 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
202
203 tt.close();
204 }
205
206 @Test(timeOut = 30_000)
207 public void testSNAPSHOT_ALL(ITestContext context) throws Exception {
208 TransactionManager tm = newTransactionManager(context);
209 TTable tt = new TTable(connection, TEST_TABLE);
210
211 byte[] rowName1 = Bytes.toBytes("row1");
212 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
213 byte[] colName1 = Bytes.toBytes("col1");
214 byte[] dataValue0 = Bytes.toBytes("testWrite-0");
215 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
216 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
217
218 Transaction tx1 = tm.begin();
219
220 Put row1 = new Put(rowName1);
221 row1.addColumn(famName1, colName1, dataValue0);
222 tt.put(tx1, row1);
223
224 tm.commit(tx1);
225
226 tx1 = tm.begin();
227
228 HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
229
230 Get g = new Get(rowName1).setMaxVersions(100);
231
232 Result r = tt.get(tx1, g);
233 assertTrue(Bytes.equals(dataValue0, r.getValue(famName1, colName1)),
234 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
235
236 row1 = new Put(rowName1);
237 row1.addColumn(famName1, colName1, dataValue1);
238 tt.put(tx1, row1);
239
240 g = new Get(rowName1).setMaxVersions(100);
241
242 r = tt.get(tx1, g);
243 assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
244 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
245
246 hbaseTx1.checkpoint();
247
248 row1 = new Put(rowName1);
249 row1.addColumn(famName1, colName1, dataValue2);
250 tt.put(tx1, row1);
251
252 r = tt.get(tx1, g);
253 assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
254 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
255
256 hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_ALL);
257
258 r = tt.get(tx1, g);
259
260 assertTrue(r.size() == 3, "Expected 3 results and found " + r.size());
261
262 List<Cell> cells = r.getColumnCells(famName1, colName1);
263 assertTrue(Bytes.equals(dataValue2, CellUtil.cloneValue(cells.get(0))),
264 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
265
266 assertTrue(Bytes.equals(dataValue1, CellUtil.cloneValue(cells.get(1))),
267 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
268
269 assertTrue(Bytes.equals(dataValue0, CellUtil.cloneValue(cells.get(2))),
270 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
271
272 tt.close();
273 }
274
275 @Test(timeOut = 30_000)
276 public void testSNAPSHOT_EXCLUDE_CURRENT(ITestContext context) throws Exception {
277 TransactionManager tm = newTransactionManager(context);
278 TTable tt = new TTable(connection, TEST_TABLE);
279
280 byte[] rowName1 = Bytes.toBytes("row1");
281 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
282 byte[] colName1 = Bytes.toBytes("col1");
283 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
284 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
285
286 Transaction tx1 = tm.begin();
287
288 HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
289
290 Put row1 = new Put(rowName1);
291 row1.addColumn(famName1, colName1, dataValue1);
292 tt.put(tx1, row1);
293
294 Get g = new Get(rowName1).setMaxVersions(1);
295
296 Result r = tt.get(tx1, g);
297 assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
298 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
299
300 hbaseTx1.checkpoint();
301
302 row1 = new Put(rowName1);
303 row1.addColumn(famName1, colName1, dataValue2);
304 tt.put(tx1, row1);
305
306 r = tt.get(tx1, g);
307 assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
308 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
309
310 hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
311
312 r = tt.get(tx1, g);
313 assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)),
314 "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1)));
315
316 tt.close();
317 }
318
319 @Test(timeOut = 30_000)
320 public void testDeleteAfterCheckpoint(ITestContext context) throws Exception {
321 TransactionManager tm = newTransactionManager(context);
322 TTable tt = new TTable(connection, TEST_TABLE);
323
324 byte[] rowName1 = Bytes.toBytes("row1");
325 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
326 byte[] colName1 = Bytes.toBytes("col1");
327 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
328
329 Transaction tx1 = tm.begin();
330
331 Put row1 = new Put(rowName1);
332 row1.addColumn(famName1, colName1, dataValue1);
333 tt.put(tx1, row1);
334
335 tm.commit(tx1);
336
337 Transaction tx2 = tm.begin();
338
339 HBaseTransaction hbaseTx2 = enforceHBaseTransactionAsParam(tx1);
340
341 hbaseTx2.checkpoint();
342
343 Delete d = new Delete(rowName1);
344 tt.delete(tx2, d);
345
346 try {
347 tm.commit(tx2);
348 } catch (TransactionException e) {
349 Assert.fail();
350 }
351
352 tt.close();
353 }
354
355 @Test(timeOut = 30_000)
356 public void testOutOfCheckpoints(ITestContext context) throws Exception {
357 TransactionManager tm = newTransactionManager(context);
358
359 Transaction tx1 = tm.begin();
360
361 HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1);
362
363 for (int i = 0; i < CommitTable.MAX_CHECKPOINTS_PER_TXN - 1; ++i) {
364 hbaseTx1.checkpoint();
365 }
366
367 try {
368 hbaseTx1.checkpoint();
369 Assert.fail();
370 } catch (TransactionException e) {
371
372 }
373
374 }
375
376
377 @Test(timeOut = 60_000)
378 public void testInMemoryCommitTableCheckpoints(ITestContext context) throws Exception {
379
380 final byte[] row = Bytes.toBytes("test-sc");
381 final byte[] family = Bytes.toBytes(TEST_FAMILY);
382 final byte[] qualifier = Bytes.toBytes("testdata");
383 final byte[] qualifier2 = Bytes.toBytes("testdata2");
384 final byte[] data1 = Bytes.toBytes("testWrite-");
385
386 final CountDownLatch beforeCTRemove = new CountDownLatch(1);
387 final CountDownLatch afterCommit = new CountDownLatch(1);
388 final CountDownLatch writerDone = new CountDownLatch(1);
389
390 final AtomicLong startTimestamp = new AtomicLong(0);
391 final AtomicLong commitTimestamp = new AtomicLong(0);
392 PostCommitActions syncPostCommitter =
393 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
394 final AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
395
396 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
397 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
398 tm.getCommitTableClient());
399 final TTable table = new TTable(htable,snapshotFilter);
400
401
402 doAnswer(new Answer<ListenableFuture<Void>>() {
403 @Override
404 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
405 afterCommit.countDown();
406 beforeCTRemove.await();
407 ListenableFuture<Void> result = (ListenableFuture<Void>) invocation.callRealMethod();
408 return result;
409 }
410 }).when(syncPostCommitter).removeCommitTableEntry(any(HBaseTransaction.class));
411
412
413 Thread writeThread = new Thread("WriteThread"){
414 @Override
415 public void run() {
416 try {
417
418 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
419 Put put = new Put(row);
420 put.addColumn(family, qualifier, data1);
421
422 startTimestamp.set(tx1.getStartTimestamp());
423 table.put(tx1, put);
424 tx1.checkpoint();
425
426 Put put2 = new Put(row);
427 put2.addColumn(family, qualifier2, data1);
428 table.put(tx1, put2);
429
430 tm.commit(tx1);
431
432 commitTimestamp.set(tx1.getCommitTimestamp());
433 writerDone.countDown();
434 } catch (IOException | RollbackException e) {
435 e.printStackTrace();
436 }
437 }
438 };
439
440 writeThread.start();
441
442 afterCommit.await();
443
444 Optional<CommitTable.CommitTimestamp> ct1 = tm.getCommitTableClient().getCommitTimestamp(startTimestamp.get()).get();
445 Optional<CommitTable.CommitTimestamp> ct2 = tm.getCommitTableClient().getCommitTimestamp(startTimestamp.get() + 1).get();
446
447 beforeCTRemove.countDown();
448
449 writerDone.await();
450
451 assertEquals(commitTimestamp.get(), ct1.get().getValue());
452 assertEquals(commitTimestamp.get(), ct2.get().getValue());
453
454
455 assertTrue(hasCell(row, family, qualifier, startTimestamp.get(), new TTableCellGetterAdapter(table)),
456 "Cell should be there");
457 assertTrue(hasCell(row, family, qualifier2, startTimestamp.get()+1, new TTableCellGetterAdapter(table)),
458 "Cell should be there");
459 assertTrue(hasShadowCell(row, family, qualifier, startTimestamp.get(), new TTableCellGetterAdapter(table)),
460 "Cell should be there");
461 assertTrue(hasShadowCell(row, family, qualifier2, startTimestamp.get()+1, new TTableCellGetterAdapter(table)),
462 "Cell should be there");
463 }
464 }