1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.mockito.Matchers.any;
21 import static org.mockito.Mockito.doAnswer;
22 import static org.mockito.Mockito.spy;
23 import static org.mockito.Mockito.verify;
24 import static org.testng.Assert.assertEquals;
25 import static org.testng.Assert.assertFalse;
26 import static org.testng.Assert.assertTrue;
27
28 import java.io.IOException;
29 import java.util.Map;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.Coprocessor;
36 import org.apache.hadoop.hbase.HBaseTestingUtility;
37 import org.apache.hadoop.hbase.HColumnDescriptor;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.MiniHBaseCluster;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.client.Admin;
42 import org.apache.hadoop.hbase.client.Connection;
43 import org.apache.hadoop.hbase.client.ConnectionFactory;
44 import org.apache.hadoop.hbase.client.Delete;
45 import org.apache.hadoop.hbase.client.Get;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.ResultScanner;
49 import org.apache.hadoop.hbase.client.Scan;
50 import org.apache.hadoop.hbase.client.Table;
51 import org.apache.hadoop.hbase.filter.BinaryComparator;
52 import org.apache.hadoop.hbase.filter.CompareFilter;
53 import org.apache.hadoop.hbase.filter.FamilyFilter;
54 import org.apache.hadoop.hbase.filter.Filter;
55 import org.apache.hadoop.hbase.filter.FilterList;
56 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
57 import org.apache.hadoop.hbase.filter.SubstringComparator;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.omid.TestUtils;
60 import org.apache.omid.committable.CommitTable;
61 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
62 import org.apache.omid.metrics.NullMetricsProvider;
63 import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
64 import org.apache.omid.tso.TSOServer;
65 import org.apache.omid.tso.TSOServerConfig;
66 import org.mockito.Mockito;
67 import org.mockito.invocation.InvocationOnMock;
68 import org.mockito.stubbing.Answer;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71 import org.testng.annotations.AfterClass;
72 import org.testng.annotations.BeforeClass;
73 import org.testng.annotations.BeforeMethod;
74 import org.testng.annotations.Test;
75 import static org.testng.Assert.fail;
76
77 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
78 import com.google.inject.Guice;
79 import com.google.inject.Injector;
80
81 public class TestSnapshotFilter {
82
83 private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFilter.class);
84
85 private static final String TEST_FAMILY = "test-fam";
86
87 private static final int MAX_VERSIONS = 3;
88
89 private AbstractTransactionManager tm;
90
91 private Injector injector;
92
93 private Admin admin;
94 private Configuration hbaseConf;
95 private HBaseTestingUtility hbaseTestUtil;
96 private MiniHBaseCluster hbaseCluster;
97
98 private TSOServer tso;
99
100 private CommitTable commitTable;
101 private PostCommitActions syncPostCommitter;
102 private Connection connection;
103
104 @BeforeClass
105 public void setupTestSnapshotFilter() throws Exception {
106 TSOServerConfig tsoConfig = new TSOServerConfig();
107 tsoConfig.setPort(5679);
108 tsoConfig.setConflictMapSize(1);
109 tsoConfig.setWaitStrategy("LOW_CPU");
110 injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig));
111 hbaseConf = injector.getInstance(Configuration.class);
112 hbaseConf.setBoolean("omid.server.side.filter", true);
113 hbaseConf.setInt("hbase.hconnection.threads.core", 5);
114 hbaseConf.setInt("hbase.hconnection.threads.max", 10);
115
116 hbaseConf.setInt("hbase.regionserver.handler.count", 10);
117
118
119 hbaseConf.setInt("hbase.master.port", 0);
120 hbaseConf.setInt("hbase.master.info.port", 0);
121 hbaseConf.setInt("hbase.regionserver.port", 0);
122 hbaseConf.setInt("hbase.regionserver.info.port", 0);
123
124
125 HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
126 HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
127
128 setupHBase();
129 connection = ConnectionFactory.createConnection(hbaseConf);
130 admin = connection.getAdmin();
131 createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
132 setupTSO();
133
134 commitTable = injector.getInstance(CommitTable.class);
135 }
136
137 private void setupHBase() throws Exception {
138 LOG.info("--------------------------------------------------------------------------------------------------");
139 LOG.info("Setting up HBase");
140 LOG.info("--------------------------------------------------------------------------------------------------");
141 hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
142 LOG.info("--------------------------------------------------------------------------------------------------");
143 LOG.info("Creating HBase MiniCluster");
144 LOG.info("--------------------------------------------------------------------------------------------------");
145 hbaseCluster = hbaseTestUtil.startMiniCluster(1);
146 }
147
148 private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
149 HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
150 createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
151
152 createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
153 }
154
155 private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
156 if (!admin.tableExists(TableName.valueOf(tableName))) {
157 LOG.info("Creating {} table...", tableName);
158 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
159
160 for (byte[] family : families) {
161 HColumnDescriptor datafam = new HColumnDescriptor(family);
162 datafam.setMaxVersions(MAX_VERSIONS);
163 desc.addFamily(datafam);
164 }
165
166 int priority = Coprocessor.PRIORITY_HIGHEST;
167
168 desc.addCoprocessor(OmidSnapshotFilter.class.getName(),null,++priority,null);
169 desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,++priority,null);
170
171 admin.createTable(desc);
172 try {
173 hbaseTestUtil.waitTableAvailable(TableName.valueOf(tableName),5000);
174 } catch (InterruptedException e) {
175 e.printStackTrace();
176 }
177 }
178
179 }
180
181 private void setupTSO() throws IOException, InterruptedException {
182 tso = injector.getInstance(TSOServer.class);
183 tso.startAsync();
184 tso.awaitRunning();
185 TestUtils.waitForSocketListening("localhost", 5679, 100);
186 Thread.currentThread().setName("UnitTest(s) thread");
187 }
188
189 @AfterClass
190 public void cleanupTestSnapshotFilter() throws Exception {
191 teardownTSO();
192 hbaseCluster.shutdown();
193 }
194
195 private void teardownTSO() throws IOException, InterruptedException {
196 tso.stopAsync();
197 tso.awaitTerminated();
198 TestUtils.waitForSocketNotListening("localhost", 5679, 1000);
199 }
200
201 @BeforeMethod
202 public void setupTestSnapshotFilterIndividualTest() throws Exception {
203 tm = spy((AbstractTransactionManager) newTransactionManager());
204 }
205
206 private TransactionManager newTransactionManager() throws Exception {
207 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
208 hbaseOmidClientConf.setConnectionString("localhost:5679");
209 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
210 CommitTable.Client commitTableClient = commitTable.getClient();
211 syncPostCommitter =
212 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
213 return HBaseTransactionManager.builder(hbaseOmidClientConf)
214 .postCommitter(syncPostCommitter)
215 .commitTableClient(commitTableClient)
216 .build();
217 }
218
219
220 @Test(timeOut = 60_000)
221 public void testGetFirstResult() throws Throwable {
222 byte[] rowName1 = Bytes.toBytes("row1");
223 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
224 byte[] colName1 = Bytes.toBytes("col1");
225 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
226
227 String TEST_TABLE = "testGetFirstResult";
228 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
229 TTable tt = new TTable(connection, TEST_TABLE);
230
231 Transaction tx1 = tm.begin();
232
233 Put row1 = new Put(rowName1);
234 row1.addColumn(famName1, colName1, dataValue1);
235 tt.put(tx1, row1);
236
237 tm.commit(tx1);
238
239 Transaction tx2 = tm.begin();
240
241 Get get = new Get(rowName1);
242 Result result = tt.get(tx2, get);
243
244 assertTrue(!result.isEmpty(), "Result should not be empty!");
245
246 long tsRow = result.rawCells()[0].getTimestamp();
247 assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
248
249 tm.commit(tx2);
250
251 Transaction tx3 = tm.begin();
252
253 Put put3 = new Put(rowName1);
254 put3.addColumn(famName1, colName1, dataValue1);
255 tt.put(tx3, put3);
256
257 tm.commit(tx3);
258
259 Transaction tx4 = tm.begin();
260
261 Get get2 = new Get(rowName1);
262 Result result2 = tt.get(tx4, get2);
263
264 assertTrue(!result2.isEmpty(), "Result should not be empty!");
265
266 long tsRow2 = result2.rawCells()[0].getTimestamp();
267 assertEquals(tsRow2, tx3.getTransactionId(), "Reading differnt version");
268
269 tm.commit(tx4);
270
271 tt.close();
272 }
273
274
275
276 @Test(timeOut = 60_000)
277 public void testServerSideSnapshotFiltering() throws Throwable {
278 byte[] rowName1 = Bytes.toBytes("row1");
279 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
280 byte[] colName1 = Bytes.toBytes("col1");
281 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
282 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
283
284 String TEST_TABLE = "testServerSideSnapshotFiltering";
285 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
286
287 TTable tt = new TTable(connection, TEST_TABLE);
288
289 Transaction tx1 = tm.begin();
290 Put put1 = new Put(rowName1);
291 put1.addColumn(famName1, colName1, dataValue1);
292 tt.put(tx1, put1);
293 tm.commit(tx1);
294
295 Transaction tx2 = tm.begin();
296 Put put2 = new Put(rowName1);
297 put2.addColumn(famName1, colName1, dataValue2);
298 tt.put(tx2, put2);
299
300 Transaction tx3 = tm.begin();
301 Get get = new Get(rowName1);
302
303
304
305 SingleColumnValueFilter filter = new SingleColumnValueFilter(
306 famName1,
307 colName1,
308 CompareFilter.CompareOp.EQUAL,
309 new SubstringComparator("testWrite-1"));
310
311 get.setFilter(filter);
312 Result results = tt.get(tx3, get);
313 assertTrue(results.size() == 1);
314 }
315
316
317
318 @Test(timeOut = 60_000)
319 public void testServerSideSnapshotScannerFiltering() throws Throwable {
320 byte[] rowName1 = Bytes.toBytes("row1");
321 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
322 byte[] colName1 = Bytes.toBytes("col1");
323 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
324 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
325
326 String TEST_TABLE = "testServerSideSnapshotFiltering";
327 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
328
329 TTable tt = new TTable(connection, TEST_TABLE);
330
331 Transaction tx1 = tm.begin();
332 Put put1 = new Put(rowName1);
333 put1.addColumn(famName1, colName1, dataValue1);
334 tt.put(tx1, put1);
335 tm.commit(tx1);
336
337 Transaction tx2 = tm.begin();
338 Put put2 = new Put(rowName1);
339 put2.addColumn(famName1, colName1, dataValue2);
340
341
342 Transaction tx3 = tm.begin();
343
344
345
346 SingleColumnValueFilter filter = new SingleColumnValueFilter(
347 famName1,
348 colName1,
349 CompareFilter.CompareOp.EQUAL,
350 new SubstringComparator("testWrite-1"));
351
352
353 Scan scan = new Scan();
354 scan.setFilter(filter);
355
356 ResultScanner iterableRS = tt.getScanner(tx3, scan);
357 Result result = iterableRS.next();
358
359 assertTrue(result.size() == 1);
360 }
361
362
363 @Test(timeOut = 60_000)
364 public void testGetWithFamilyDelete() throws Throwable {
365 byte[] rowName1 = Bytes.toBytes("row1");
366 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
367 byte[] famName2 = Bytes.toBytes("test-fam2");
368 byte[] colName1 = Bytes.toBytes("col1");
369 byte[] colName2 = Bytes.toBytes("col2");
370 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
371
372 String TEST_TABLE = "testGetWithFamilyDelete";
373 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
374
375 TTable tt = new TTable(connection, TEST_TABLE);
376
377 Transaction tx1 = tm.begin();
378
379 Put put1 = new Put(rowName1);
380 put1.addColumn(famName1, colName1, dataValue1);
381 tt.put(tx1, put1);
382
383 tm.commit(tx1);
384
385 Transaction tx2 = tm.begin();
386 Put put2 = new Put(rowName1);
387 put2.addColumn(famName2, colName2, dataValue1);
388 tt.put(tx2, put2);
389 tm.commit(tx2);
390
391 Transaction tx3 = tm.begin();
392
393 Delete d = new Delete(rowName1);
394 d.addFamily(famName2);
395 tt.delete(tx3, d);
396
397
398 Transaction tx4 = tm.begin();
399
400 Get get = new Get(rowName1);
401
402 Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
403 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
404 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)));
405
406 get.setFilter(filter1);
407 Result result = tt.get(tx4, get);
408 assertTrue(result.size() == 2, "Result should be 2");
409
410 try {
411 tm.commit(tx3);
412 } catch (RollbackException e) {
413 if (!tm.isLowLatency())
414 fail();
415 }
416 Transaction tx5 = tm.begin();
417 result = tt.get(tx5, get);
418 if (!tm.isLowLatency())
419 assertTrue(result.size() == 1, "Result should be 1");
420
421 tt.close();
422 }
423
424 @Test(timeOut = 60_000)
425 public void testReadFromCommitTable() throws Exception {
426 final byte[] rowName1 = Bytes.toBytes("row1");
427 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
428 byte[] colName1 = Bytes.toBytes("col1");
429 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
430 final String TEST_TABLE = "testReadFromCommitTable";
431 final byte[] famName2 = Bytes.toBytes("test-fam2");
432
433 final CountDownLatch readAfterCommit = new CountDownLatch(1);
434 final CountDownLatch postCommitBegin = new CountDownLatch(1);
435
436 final AtomicBoolean readFailed = new AtomicBoolean(false);
437 final AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager();
438 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
439
440 doAnswer(new Answer<ListenableFuture<Void>>() {
441 @Override
442 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
443 LOG.info("Releasing readAfterCommit barrier");
444 readAfterCommit.countDown();
445 LOG.info("Waiting postCommitBegin barrier");
446 postCommitBegin.await();
447 ListenableFuture<Void> result = (ListenableFuture<Void>) invocation.callRealMethod();
448 return result;
449 }
450 }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
451
452 Thread readThread = new Thread("Read Thread") {
453 @Override
454 public void run() {
455
456 try {
457 LOG.info("Waiting readAfterCommit barrier");
458 readAfterCommit.await();
459
460 Transaction tx4 = tm.begin();
461 TTable tt = new TTable(connection, TEST_TABLE);
462 Get get = new Get(rowName1);
463
464 Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
465 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
466 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)));
467
468 get.setFilter(filter1);
469 Result result = tt.get(tx4, get);
470
471 if (result.size() == 2) {
472 readFailed.set(false);
473 }
474 else {
475 readFailed.set(false);
476 }
477
478 postCommitBegin.countDown();
479 } catch (Throwable e) {
480 readFailed.set(false);
481 LOG.error("Error whilst reading", e);
482 }
483 }
484 };
485 readThread.start();
486
487 TTable table = new TTable(connection, TEST_TABLE);
488 final HBaseTransaction t1 = (HBaseTransaction) tm.begin();
489 Put put1 = new Put(rowName1);
490 put1.addColumn(famName1, colName1, dataValue1);
491 table.put(t1, put1);
492 tm.commit(t1);
493
494 readThread.join();
495
496 assertFalse(readFailed.get(), "Read should have succeeded");
497
498 }
499
500
501
502 @Test(timeOut = 60_000)
503 public void testGetWithFilter() throws Throwable {
504 byte[] rowName1 = Bytes.toBytes("row1");
505 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
506 byte[] famName2 = Bytes.toBytes("test-fam2");
507 byte[] colName1 = Bytes.toBytes("col1");
508 byte[] colName2 = Bytes.toBytes("col2");
509 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
510
511 String TEST_TABLE = "testGetWithFilter";
512 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
513 TTable tt = new TTable(connection, TEST_TABLE);
514
515 Transaction tx1 = tm.begin();
516
517 Put put1 = new Put(rowName1);
518 put1.addColumn(famName1, colName1, dataValue1);
519 tt.put(tx1, put1);
520
521 tm.commit(tx1);
522
523 Transaction tx2 = tm.begin();
524 Put put2 = new Put(rowName1);
525 put2.addColumn(famName2, colName2, dataValue1);
526 tt.put(tx2, put2);
527 tm.commit(tx2);
528
529 Transaction tx3 = tm.begin();
530
531 Get get = new Get(rowName1);
532
533 Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
534 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
535 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)));
536
537 get.setFilter(filter1);
538 Result result = tt.get(tx3, get);
539 assertTrue(result.size() == 2, "Result should be 2");
540
541
542 Filter filter2 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
543 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))));
544
545 get.setFilter(filter2);
546 result = tt.get(tx3, get);
547 assertTrue(result.size() == 1, "Result should be 2");
548
549 tm.commit(tx3);
550
551 tt.close();
552 }
553
554
555 @Test(timeOut = 60_000)
556 public void testGetSecondResult() throws Throwable {
557 byte[] rowName1 = Bytes.toBytes("row1");
558 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
559 byte[] colName1 = Bytes.toBytes("col1");
560 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
561
562 String TEST_TABLE = "testGetSecondResult";
563 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
564 TTable tt = new TTable(connection, TEST_TABLE);
565
566 Transaction tx1 = tm.begin();
567
568 Put put1 = new Put(rowName1);
569 put1.addColumn(famName1, colName1, dataValue1);
570 tt.put(tx1, put1);
571
572 tm.commit(tx1);
573
574 Transaction tx2 = tm.begin();
575 Put put2 = new Put(rowName1);
576 put2.addColumn(famName1, colName1, dataValue1);
577 tt.put(tx2, put2);
578
579 Transaction tx3 = tm.begin();
580
581 Get get = new Get(rowName1);
582 Result result = tt.get(tx3, get);
583
584 assertTrue(!result.isEmpty(), "Result should not be empty!");
585
586 long tsRow = result.rawCells()[0].getTimestamp();
587 assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
588
589 tm.commit(tx3);
590
591 tt.close();
592 }
593
594 @Test(timeOut = 60_000)
595 public void testScanFirstResult() throws Throwable {
596
597 byte[] rowName1 = Bytes.toBytes("row1");
598 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
599 byte[] colName1 = Bytes.toBytes("col1");
600 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
601
602 String TEST_TABLE = "testScanFirstResult";
603 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
604 TTable tt = new TTable(connection, TEST_TABLE);
605
606 Transaction tx1 = tm.begin();
607
608 Put row1 = new Put(rowName1);
609 row1.addColumn(famName1, colName1, dataValue1);
610 tt.put(tx1, row1);
611
612 tm.commit(tx1);
613
614 Transaction tx2 = tm.begin();
615
616 ResultScanner iterableRS = tt.getScanner(tx2, new Scan().setStartRow(rowName1).setStopRow(rowName1));
617 Result result = iterableRS.next();
618 long tsRow = result.rawCells()[0].getTimestamp();
619 assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
620
621 assertFalse(iterableRS.next() != null);
622
623 tm.commit(tx2);
624
625 Transaction tx3 = tm.begin();
626
627 Put put3 = new Put(rowName1);
628 put3.addColumn(famName1, colName1, dataValue1);
629 tt.put(tx3, put3);
630
631 tm.commit(tx3);
632
633 Transaction tx4 = tm.begin();
634
635 ResultScanner iterableRS2 = tt.getScanner(tx4, new Scan().setStartRow(rowName1).setStopRow(rowName1));
636 Result result2 = iterableRS2.next();
637 long tsRow2 = result2.rawCells()[0].getTimestamp();
638 assertEquals(tsRow2, tx3.getTransactionId(), "Reading differnt version");
639
640 assertFalse(iterableRS2.next() != null);
641
642 tm.commit(tx4);
643 tt.close();
644 }
645
646
647 @Test(timeOut = 60_000)
648 public void testScanWithFilter() throws Throwable {
649
650 byte[] rowName1 = Bytes.toBytes("row1");
651 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
652 byte[] famName2 = Bytes.toBytes("test-fam2");
653 byte[] colName1 = Bytes.toBytes("col1");
654 byte[] colName2 = Bytes.toBytes("col2");
655 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
656
657 String TEST_TABLE = "testScanWithFilter";
658 createTableIfNotExists(TEST_TABLE, famName1, famName2);
659 TTable tt = new TTable(connection, TEST_TABLE);
660
661 Transaction tx1 = tm.begin();
662 Put put1 = new Put(rowName1);
663 put1.addColumn(famName1, colName1, dataValue1);
664 tt.put(tx1, put1);
665 tm.commit(tx1);
666
667 Transaction tx2 = tm.begin();
668 Put put2 = new Put(rowName1);
669 put2.addColumn(famName2, colName2, dataValue1);
670 tt.put(tx2, put2);
671
672 tm.commit(tx2);
673 Transaction tx3 = tm.begin();
674
675 Scan scan = new Scan();
676 scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE,
677 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY)))));
678 scan.setStartRow(rowName1).setStopRow(rowName1);
679
680 ResultScanner iterableRS = tt.getScanner(tx3, scan);
681 Result result = iterableRS.next();
682 assertTrue(result.containsColumn(famName1, colName1));
683 assertFalse(result.containsColumn(famName2, colName2));
684
685 scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE,
686 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
687 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2))));
688
689 iterableRS = tt.getScanner(tx3, scan);
690 result = iterableRS.next();
691 assertTrue(result.containsColumn(famName1, colName1));
692 assertTrue(result.containsColumn(famName2, colName2));
693
694 tm.commit(tx3);
695 tt.close();
696 }
697
698
699 @Test(timeOut = 60_000)
700 public void testScanSecondResult() throws Throwable {
701
702 byte[] rowName1 = Bytes.toBytes("row1");
703 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
704 byte[] colName1 = Bytes.toBytes("col1");
705 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
706
707 String TEST_TABLE = "testScanSecondResult";
708 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
709 TTable tt = new TTable(connection, TEST_TABLE);
710
711 Transaction tx1 = tm.begin();
712
713 Put put1 = new Put(rowName1);
714 put1.addColumn(famName1, colName1, dataValue1);
715 tt.put(tx1, put1);
716
717 tm.commit(tx1);
718
719 Transaction tx2 = tm.begin();
720
721 Put put2 = new Put(rowName1);
722 put2.addColumn(famName1, colName1, dataValue1);
723 tt.put(tx2, put2);
724
725 Transaction tx3 = tm.begin();
726
727 ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName1));
728 Result result = iterableRS.next();
729 long tsRow = result.rawCells()[0].getTimestamp();
730 assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
731
732 assertFalse(iterableRS.next() != null);
733
734 tm.commit(tx3);
735
736 tt.close();
737 }
738
739 @Test (timeOut = 60_000)
740 public void testScanFewResults() throws Throwable {
741
742 byte[] rowName1 = Bytes.toBytes("row1");
743 byte[] rowName2 = Bytes.toBytes("row2");
744 byte[] rowName3 = Bytes.toBytes("row3");
745 byte[] famName = Bytes.toBytes(TEST_FAMILY);
746 byte[] colName1 = Bytes.toBytes("col1");
747 byte[] colName2 = Bytes.toBytes("col2");
748 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
749 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
750
751 String TEST_TABLE = "testScanFewResults";
752 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
753 TTable tt = new TTable(connection, TEST_TABLE);
754
755 Transaction tx1 = tm.begin();
756
757 Put put1 = new Put(rowName1);
758 put1.addColumn(famName, colName1, dataValue1);
759 tt.put(tx1, put1);
760
761 tm.commit(tx1);
762
763 Transaction tx2 = tm.begin();
764
765 Put put2 = new Put(rowName2);
766 put2.addColumn(famName, colName2, dataValue2);
767 tt.put(tx2, put2);
768
769 tm.commit(tx2);
770
771 Transaction tx3 = tm.begin();
772
773 ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3));
774 Result result = iterableRS.next();
775 long tsRow = result.rawCells()[0].getTimestamp();
776 assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
777
778 result = iterableRS.next();
779 tsRow = result.rawCells()[0].getTimestamp();
780 assertEquals(tsRow, tx2.getTransactionId(), "Reading differnt version");
781
782 assertFalse(iterableRS.next() != null);
783
784 tm.commit(tx3);
785
786 tt.close();
787 }
788
789 @Test (timeOut = 60_000)
790 public void testScanFewResultsDifferentTransaction() throws Throwable {
791
792 byte[] rowName1 = Bytes.toBytes("row1");
793 byte[] rowName2 = Bytes.toBytes("row2");
794 byte[] rowName3 = Bytes.toBytes("row3");
795 byte[] famName = Bytes.toBytes(TEST_FAMILY);
796 byte[] colName1 = Bytes.toBytes("col1");
797 byte[] colName2 = Bytes.toBytes("col2");
798 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
799 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
800
801 String TEST_TABLE = "testScanFewResultsDifferentTransaction";
802 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
803 TTable tt = new TTable(connection, TEST_TABLE);
804
805 Transaction tx1 = tm.begin();
806
807 Put put1 = new Put(rowName1);
808 put1.addColumn(famName, colName1, dataValue1);
809 tt.put(tx1, put1);
810 Put put2 = new Put(rowName2);
811 put2.addColumn(famName, colName2, dataValue2);
812 tt.put(tx1, put2);
813
814 tm.commit(tx1);
815
816 Transaction tx2 = tm.begin();
817
818 put2 = new Put(rowName2);
819 put2.addColumn(famName, colName2, dataValue2);
820 tt.put(tx2, put2);
821
822 tm.commit(tx2);
823
824 Transaction tx3 = tm.begin();
825
826 ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3));
827 Result result = iterableRS.next();
828 long tsRow = result.rawCells()[0].getTimestamp();
829 assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
830
831 result = iterableRS.next();
832 tsRow = result.rawCells()[0].getTimestamp();
833 assertEquals(tsRow, tx2.getTransactionId(), "Reading differnt version");
834
835 assertFalse(iterableRS.next() != null);
836
837 tm.commit(tx3);
838
839 tt.close();
840 }
841
842 @Test (timeOut = 60_000)
843 public void testScanFewResultsSameTransaction() throws Throwable {
844
845 byte[] rowName1 = Bytes.toBytes("row1");
846 byte[] rowName2 = Bytes.toBytes("row2");
847 byte[] rowName3 = Bytes.toBytes("row3");
848 byte[] famName = Bytes.toBytes(TEST_FAMILY);
849 byte[] colName1 = Bytes.toBytes("col1");
850 byte[] colName2 = Bytes.toBytes("col2");
851 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
852 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
853
854 String TEST_TABLE = "testScanFewResultsSameTransaction";
855 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
856 TTable tt = new TTable(connection, TEST_TABLE);
857
858 Transaction tx1 = tm.begin();
859
860 Put put1 = new Put(rowName1);
861 put1.addColumn(famName, colName1, dataValue1);
862 tt.put(tx1, put1);
863 Put put2 = new Put(rowName2);
864 put2.addColumn(famName, colName2, dataValue2);
865 tt.put(tx1, put2);
866
867 tm.commit(tx1);
868
869 Transaction tx2 = tm.begin();
870
871 put2 = new Put(rowName2);
872 put2.addColumn(famName, colName2, dataValue2);
873 tt.put(tx2, put2);
874
875 Transaction tx3 = tm.begin();
876
877 ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3));
878 Result result = iterableRS.next();
879 long tsRow = result.rawCells()[0].getTimestamp();
880 assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
881
882 result = iterableRS.next();
883 tsRow = result.rawCells()[0].getTimestamp();
884 assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
885
886 assertFalse(iterableRS.next() != null);
887
888 tm.commit(tx3);
889
890 tt.close();
891 }
892
893
894 @Test (timeOut = 60_000)
895 public void testFilterCommitCacheInSnapshot() throws Throwable {
896 String TEST_TABLE = "testFilterCommitCacheInSnapshot";
897 byte[] rowName = Bytes.toBytes("row1");
898 byte[] famName = Bytes.toBytes(TEST_FAMILY);
899
900 createTableIfNotExists(TEST_TABLE, famName);
901 TTable tt = new TTable(connection, TEST_TABLE);
902
903 Transaction tx1 = tm.begin();
904 Put put = new Put(rowName);
905 for (int i = 0; i < 200; ++i) {
906 byte[] dataValue1 = Bytes.toBytes("some data");
907 byte[] colName = Bytes.toBytes("col" + i);
908 put.addColumn(famName, colName, dataValue1);
909 }
910 tt.put(tx1, put);
911 tm.commit(tx1);
912 Transaction tx3 = tm.begin();
913
914 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
915 SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
916 tm.getCommitTableClient()));
917 Filter newFilter = TransactionFilters.getVisibilityFilter(null,
918 snapshotFilter, (HBaseTransaction) tx3);
919
920 Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
921
922 Scan scan = new Scan();
923 ResultScanner scanner = rawTable.getScanner(scan);
924
925 for(Result row: scanner) {
926 for(Cell cell: row.rawCells()) {
927 newFilter.filterKeyValue(cell);
928
929 }
930 }
931 verify(snapshotFilter, Mockito.times(0))
932 .getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
933 tm.commit(tx3);
934 tt.close();
935 }
936
937 @Test (timeOut = 60_000)
938 public void testFilterCommitCacheNotInSnapshot() throws Throwable {
939 String TEST_TABLE = "testFilterCommitCacheNotInSnapshot";
940 byte[] rowName = Bytes.toBytes("row1");
941 byte[] famName = Bytes.toBytes(TEST_FAMILY);
942
943 createTableIfNotExists(TEST_TABLE, famName);
944 TTable tt = new TTable(connection, TEST_TABLE);
945
946
947
948 Transaction tx1 = tm.begin();
949 Put put = new Put(rowName);
950 for (int i = 0; i < 200; ++i) {
951 byte[] dataValue1 = Bytes.toBytes("some data");
952 byte[] colName = Bytes.toBytes("col" + i);
953 put.addColumn(famName, colName, dataValue1);
954 }
955 tt.put(tx1, put);
956
957
958 Transaction tx = tm.begin();
959 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
960 SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
961 tm.getCommitTableClient()));
962 Filter newFilter = TransactionFilters.getVisibilityFilter(null,
963 snapshotFilter, (HBaseTransaction) tx);
964
965 Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
966
967 Scan scan = new Scan();
968 ResultScanner scanner = rawTable.getScanner(scan);
969
970 for(Result row: scanner) {
971 for(Cell cell: row.rawCells()) {
972 newFilter.filterKeyValue(cell);
973 }
974 }
975 verify(snapshotFilter, Mockito.times(1))
976 .getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
977 tt.close();
978 }
979
980
981 }