View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.mockito.Matchers.any;
21  import static org.mockito.Mockito.doAnswer;
22  import static org.mockito.Mockito.spy;
23  import static org.mockito.Mockito.verify;
24  import static org.testng.Assert.assertEquals;
25  import static org.testng.Assert.assertFalse;
26  import static org.testng.Assert.assertTrue;
27  
28  import java.io.IOException;
29  import java.util.Map;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.Coprocessor;
36  import org.apache.hadoop.hbase.HBaseTestingUtility;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.MiniHBaseCluster;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.client.Admin;
42  import org.apache.hadoop.hbase.client.Connection;
43  import org.apache.hadoop.hbase.client.ConnectionFactory;
44  import org.apache.hadoop.hbase.client.Delete;
45  import org.apache.hadoop.hbase.client.Get;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.ResultScanner;
49  import org.apache.hadoop.hbase.client.Scan;
50  import org.apache.hadoop.hbase.client.Table;
51  import org.apache.hadoop.hbase.filter.BinaryComparator;
52  import org.apache.hadoop.hbase.filter.CompareFilter;
53  import org.apache.hadoop.hbase.filter.FamilyFilter;
54  import org.apache.hadoop.hbase.filter.Filter;
55  import org.apache.hadoop.hbase.filter.FilterList;
56  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
57  import org.apache.hadoop.hbase.filter.SubstringComparator;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.omid.TestUtils;
60  import org.apache.omid.committable.CommitTable;
61  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
62  import org.apache.omid.metrics.NullMetricsProvider;
63  import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
64  import org.apache.omid.tso.TSOServer;
65  import org.apache.omid.tso.TSOServerConfig;
66  import org.mockito.Mockito;
67  import org.mockito.invocation.InvocationOnMock;
68  import org.mockito.stubbing.Answer;
69  import org.slf4j.Logger;
70  import org.slf4j.LoggerFactory;
71  import org.testng.annotations.AfterClass;
72  import org.testng.annotations.BeforeClass;
73  import org.testng.annotations.BeforeMethod;
74  import org.testng.annotations.Test;
75  import static org.testng.Assert.fail;
76  
77  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
78  import com.google.inject.Guice;
79  import com.google.inject.Injector;
80  
81  public class TestSnapshotFilter {
82  
83      private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFilter.class);
84  
85      private static final String TEST_FAMILY = "test-fam";
86      
87      private static final int MAX_VERSIONS = 3;
88  
89      private AbstractTransactionManager tm;
90  
91      private Injector injector;
92  
93      private Admin admin;
94      private Configuration hbaseConf;
95      private HBaseTestingUtility hbaseTestUtil;
96      private MiniHBaseCluster hbaseCluster;
97  
98      private TSOServer tso;
99  
100     private CommitTable commitTable;
101     private PostCommitActions syncPostCommitter;
102     private Connection connection;
103 
104     @BeforeClass
105     public void setupTestSnapshotFilter() throws Exception {
106         TSOServerConfig tsoConfig = new TSOServerConfig();
107         tsoConfig.setPort(5679);
108         tsoConfig.setConflictMapSize(1);
109         tsoConfig.setWaitStrategy("LOW_CPU");
110         injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig));
111         hbaseConf = injector.getInstance(Configuration.class);
112         hbaseConf.setBoolean("omid.server.side.filter", true);
113         hbaseConf.setInt("hbase.hconnection.threads.core", 5);
114         hbaseConf.setInt("hbase.hconnection.threads.max", 10);
115         // Tunn down handler threads in regionserver
116         hbaseConf.setInt("hbase.regionserver.handler.count", 10);
117 
118         // Set to random port
119         hbaseConf.setInt("hbase.master.port", 0);
120         hbaseConf.setInt("hbase.master.info.port", 0);
121         hbaseConf.setInt("hbase.regionserver.port", 0);
122         hbaseConf.setInt("hbase.regionserver.info.port", 0);
123 
124 
125         HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
126         HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
127 
128         setupHBase();
129         connection = ConnectionFactory.createConnection(hbaseConf);
130         admin = connection.getAdmin();
131         createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
132         setupTSO();
133 
134         commitTable = injector.getInstance(CommitTable.class);
135     }
136 
137     private void setupHBase() throws Exception {
138         LOG.info("--------------------------------------------------------------------------------------------------");
139         LOG.info("Setting up HBase");
140         LOG.info("--------------------------------------------------------------------------------------------------");
141         hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
142         LOG.info("--------------------------------------------------------------------------------------------------");
143         LOG.info("Creating HBase MiniCluster");
144         LOG.info("--------------------------------------------------------------------------------------------------");
145         hbaseCluster = hbaseTestUtil.startMiniCluster(1);
146     }
147 
148     private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
149                                            HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
150         createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
151 
152         createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
153     }
154 
155     private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
156         if (!admin.tableExists(TableName.valueOf(tableName))) {
157             LOG.info("Creating {} table...", tableName);
158             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
159 
160             for (byte[] family : families) {
161                 HColumnDescriptor datafam = new HColumnDescriptor(family);
162                 datafam.setMaxVersions(MAX_VERSIONS);
163                 desc.addFamily(datafam);
164             }
165 
166             int priority = Coprocessor.PRIORITY_HIGHEST;
167 
168             desc.addCoprocessor(OmidSnapshotFilter.class.getName(),null,++priority,null);
169             desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,++priority,null);
170 
171             admin.createTable(desc);
172             try {
173                 hbaseTestUtil.waitTableAvailable(TableName.valueOf(tableName),5000);
174             } catch (InterruptedException e) {
175                 e.printStackTrace();
176             }
177         }
178 
179     }
180 
181     private void setupTSO() throws IOException, InterruptedException {
182         tso = injector.getInstance(TSOServer.class);
183         tso.startAsync();
184         tso.awaitRunning();
185         TestUtils.waitForSocketListening("localhost", 5679, 100);
186         Thread.currentThread().setName("UnitTest(s) thread");
187     }
188 
189     @AfterClass
190     public void cleanupTestSnapshotFilter() throws Exception {
191         teardownTSO();
192         hbaseCluster.shutdown();
193     }
194 
195     private void teardownTSO() throws IOException, InterruptedException {
196         tso.stopAsync();
197         tso.awaitTerminated();
198         TestUtils.waitForSocketNotListening("localhost", 5679, 1000);
199     }
200 
201     @BeforeMethod
202     public void setupTestSnapshotFilterIndividualTest() throws Exception {
203         tm = spy((AbstractTransactionManager) newTransactionManager());
204     }
205 
206     private TransactionManager newTransactionManager() throws Exception {
207         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
208         hbaseOmidClientConf.setConnectionString("localhost:5679");
209         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
210         CommitTable.Client commitTableClient = commitTable.getClient();
211         syncPostCommitter =
212                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
213         return HBaseTransactionManager.builder(hbaseOmidClientConf)
214                 .postCommitter(syncPostCommitter)
215                 .commitTableClient(commitTableClient)
216                 .build();
217     }
218 
219 
220     @Test(timeOut = 60_000)
221     public void testGetFirstResult() throws Throwable {
222         byte[] rowName1 = Bytes.toBytes("row1");
223         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
224         byte[] colName1 = Bytes.toBytes("col1");
225         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
226 
227         String TEST_TABLE = "testGetFirstResult";
228         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
229         TTable tt = new TTable(connection, TEST_TABLE);
230 
231         Transaction tx1 = tm.begin();
232 
233         Put row1 = new Put(rowName1);
234         row1.addColumn(famName1, colName1, dataValue1);
235         tt.put(tx1, row1);
236      
237         tm.commit(tx1);
238 
239         Transaction tx2 = tm.begin();
240 
241         Get get = new Get(rowName1);
242         Result result = tt.get(tx2, get);
243 
244         assertTrue(!result.isEmpty(), "Result should not be empty!");
245 
246         long tsRow = result.rawCells()[0].getTimestamp();
247         assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
248 
249         tm.commit(tx2);
250 
251         Transaction tx3 = tm.begin();
252 
253         Put put3 = new Put(rowName1);
254         put3.addColumn(famName1, colName1, dataValue1);
255         tt.put(tx3, put3);
256 
257         tm.commit(tx3);
258         
259         Transaction tx4 = tm.begin();
260 
261         Get get2 = new Get(rowName1);
262         Result result2 = tt.get(tx4, get2);
263 
264         assertTrue(!result2.isEmpty(), "Result should not be empty!");
265 
266         long tsRow2 = result2.rawCells()[0].getTimestamp();
267         assertEquals(tsRow2, tx3.getTransactionId(), "Reading differnt version");
268 
269         tm.commit(tx4);
270 
271         tt.close();
272     }
273 
274 
275     // This test will fail if filtering is done before snapshot filtering
276     @Test(timeOut = 60_000)
277     public void testServerSideSnapshotFiltering() throws Throwable {
278         byte[] rowName1 = Bytes.toBytes("row1");
279         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
280         byte[] colName1 = Bytes.toBytes("col1");
281         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
282         byte[] dataValue2 = Bytes.toBytes("testWrite-2");
283 
284         String TEST_TABLE = "testServerSideSnapshotFiltering";
285         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
286 
287         TTable tt = new TTable(connection, TEST_TABLE);
288 
289         Transaction tx1 = tm.begin();
290         Put put1 = new Put(rowName1);
291         put1.addColumn(famName1, colName1, dataValue1);
292         tt.put(tx1, put1);
293         tm.commit(tx1);
294 
295         Transaction tx2 = tm.begin();
296         Put put2 = new Put(rowName1);
297         put2.addColumn(famName1, colName1, dataValue2);
298         tt.put(tx2, put2);
299 
300         Transaction tx3 = tm.begin();
301         Get get = new Get(rowName1);
302 
303         // If snapshot filtering is not done in the server then the first value is
304         // "testWrite-2" and the whole row will be filtered out.
305         SingleColumnValueFilter filter = new SingleColumnValueFilter(
306                 famName1,
307                 colName1,
308                 CompareFilter.CompareOp.EQUAL,
309                 new SubstringComparator("testWrite-1"));
310 
311         get.setFilter(filter);
312         Result results = tt.get(tx3, get);
313         assertTrue(results.size() == 1);
314     }
315 
316 
317     // This test will fail if filtering is done before snapshot filtering
318     @Test(timeOut = 60_000)
319     public void testServerSideSnapshotScannerFiltering() throws Throwable {
320         byte[] rowName1 = Bytes.toBytes("row1");
321         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
322         byte[] colName1 = Bytes.toBytes("col1");
323         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
324         byte[] dataValue2 = Bytes.toBytes("testWrite-2");
325 
326         String TEST_TABLE = "testServerSideSnapshotFiltering";
327         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
328 
329         TTable tt = new TTable(connection, TEST_TABLE);
330 
331         Transaction tx1 = tm.begin();
332         Put put1 = new Put(rowName1);
333         put1.addColumn(famName1, colName1, dataValue1);
334         tt.put(tx1, put1);
335         tm.commit(tx1);
336 
337         Transaction tx2 = tm.begin();
338         Put put2 = new Put(rowName1);
339         put2.addColumn(famName1, colName1, dataValue2);
340 //        tt.put(tx2, put2);
341 
342         Transaction tx3 = tm.begin();
343 
344         // If snapshot filtering is not done in the server then the first value is
345         // "testWrite-2" and the whole row will be filtered out.
346         SingleColumnValueFilter filter = new SingleColumnValueFilter(
347                 famName1,
348                 colName1,
349                 CompareFilter.CompareOp.EQUAL,
350                 new SubstringComparator("testWrite-1"));
351 
352 
353         Scan scan = new Scan();
354         scan.setFilter(filter);
355 
356         ResultScanner iterableRS = tt.getScanner(tx3, scan);
357         Result result = iterableRS.next();
358 
359         assertTrue(result.size() == 1);
360     }
361 
362 
363     @Test(timeOut = 60_000)
364     public void testGetWithFamilyDelete() throws Throwable {
365         byte[] rowName1 = Bytes.toBytes("row1");
366         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
367         byte[] famName2 = Bytes.toBytes("test-fam2");
368         byte[] colName1 = Bytes.toBytes("col1");
369         byte[] colName2 = Bytes.toBytes("col2");
370         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
371 
372         String TEST_TABLE = "testGetWithFamilyDelete";
373         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
374 
375         TTable tt = new TTable(connection, TEST_TABLE);
376 
377         Transaction tx1 = tm.begin();
378 
379         Put put1 = new Put(rowName1);
380         put1.addColumn(famName1, colName1, dataValue1);
381         tt.put(tx1, put1);
382 
383         tm.commit(tx1);
384 
385         Transaction tx2 = tm.begin();
386         Put put2 = new Put(rowName1);
387         put2.addColumn(famName2, colName2, dataValue1);
388         tt.put(tx2, put2);
389         tm.commit(tx2);
390 
391         Transaction tx3 = tm.begin();
392 
393         Delete d = new Delete(rowName1);
394         d.addFamily(famName2);
395         tt.delete(tx3, d);
396 
397 
398         Transaction tx4 = tm.begin();
399 
400         Get get = new Get(rowName1);
401 
402         Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
403                 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
404                 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)));
405 
406         get.setFilter(filter1);
407         Result result = tt.get(tx4, get);
408         assertTrue(result.size() == 2, "Result should be 2");
409 
410         try {
411             tm.commit(tx3);
412         } catch (RollbackException e) {
413             if (!tm.isLowLatency())
414                 fail();
415         }
416         Transaction tx5 = tm.begin();
417         result = tt.get(tx5, get);
418         if (!tm.isLowLatency())
419             assertTrue(result.size() == 1, "Result should be 1");
420 
421         tt.close();
422     }
423 
424     @Test(timeOut = 60_000)
425     public void testReadFromCommitTable() throws Exception {
426         final byte[] rowName1 = Bytes.toBytes("row1");
427         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
428         byte[] colName1 = Bytes.toBytes("col1");
429         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
430         final String TEST_TABLE = "testReadFromCommitTable";
431         final byte[] famName2 = Bytes.toBytes("test-fam2");
432 
433         final CountDownLatch readAfterCommit = new CountDownLatch(1);
434         final CountDownLatch postCommitBegin = new CountDownLatch(1);
435 
436         final AtomicBoolean readFailed = new AtomicBoolean(false);
437         final AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager();
438         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
439 
440         doAnswer(new Answer<ListenableFuture<Void>>() {
441             @Override
442             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
443                 LOG.info("Releasing readAfterCommit barrier");
444                 readAfterCommit.countDown();
445                 LOG.info("Waiting postCommitBegin barrier");
446                 postCommitBegin.await();
447                 ListenableFuture<Void> result = (ListenableFuture<Void>) invocation.callRealMethod();
448                 return result;
449             }
450         }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
451 
452         Thread readThread = new Thread("Read Thread") {
453             @Override
454             public void run() {
455 
456                 try {
457                     LOG.info("Waiting readAfterCommit barrier");
458                     readAfterCommit.await();
459 
460                     Transaction tx4 = tm.begin();
461                     TTable tt = new TTable(connection, TEST_TABLE);
462                     Get get = new Get(rowName1);
463 
464                     Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
465                             new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
466                             new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)));
467 
468                     get.setFilter(filter1);
469                     Result result = tt.get(tx4, get);
470 
471                     if (result.size() == 2) {
472                         readFailed.set(false);
473                     }
474                     else {
475                         readFailed.set(false);
476                     }
477 
478                     postCommitBegin.countDown();
479                 } catch (Throwable e) {
480                     readFailed.set(false);
481                     LOG.error("Error whilst reading", e);
482                 }
483             }
484         };
485         readThread.start();
486 
487         TTable table = new TTable(connection, TEST_TABLE);
488         final HBaseTransaction t1 = (HBaseTransaction) tm.begin();
489         Put put1 = new Put(rowName1);
490         put1.addColumn(famName1, colName1, dataValue1);
491         table.put(t1, put1);
492         tm.commit(t1);
493 
494         readThread.join();
495 
496         assertFalse(readFailed.get(), "Read should have succeeded");
497 
498     }
499 
500 
501 
502     @Test(timeOut = 60_000)
503     public void testGetWithFilter() throws Throwable {
504         byte[] rowName1 = Bytes.toBytes("row1");
505         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
506         byte[] famName2 = Bytes.toBytes("test-fam2");
507         byte[] colName1 = Bytes.toBytes("col1");
508         byte[] colName2 = Bytes.toBytes("col2");
509         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
510 
511         String TEST_TABLE = "testGetWithFilter";
512         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
513         TTable tt = new TTable(connection, TEST_TABLE);
514 
515         Transaction tx1 = tm.begin();
516 
517         Put put1 = new Put(rowName1);
518         put1.addColumn(famName1, colName1, dataValue1);
519         tt.put(tx1, put1);
520 
521         tm.commit(tx1);
522 
523         Transaction tx2 = tm.begin();
524         Put put2 = new Put(rowName1);
525         put2.addColumn(famName2, colName2, dataValue1);
526         tt.put(tx2, put2);
527         tm.commit(tx2);
528 
529         Transaction tx3 = tm.begin();
530 
531         Get get = new Get(rowName1);
532 
533         Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
534                 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
535                 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2)));
536 
537         get.setFilter(filter1);
538         Result result = tt.get(tx3, get);
539         assertTrue(result.size() == 2, "Result should be 2");
540 
541 
542         Filter filter2 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
543                 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))));
544 
545         get.setFilter(filter2);
546         result = tt.get(tx3, get);
547         assertTrue(result.size() == 1, "Result should be 2");
548 
549         tm.commit(tx3);
550 
551         tt.close();
552     }
553 
554 
555     @Test(timeOut = 60_000)
556     public void testGetSecondResult() throws Throwable {
557         byte[] rowName1 = Bytes.toBytes("row1");
558         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
559         byte[] colName1 = Bytes.toBytes("col1");
560         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
561 
562         String TEST_TABLE = "testGetSecondResult";
563         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
564         TTable tt = new TTable(connection, TEST_TABLE);
565 
566         Transaction tx1 = tm.begin();
567 
568         Put put1 = new Put(rowName1);
569         put1.addColumn(famName1, colName1, dataValue1);
570         tt.put(tx1, put1);
571 
572         tm.commit(tx1);
573 
574         Transaction tx2 = tm.begin();
575         Put put2 = new Put(rowName1);
576         put2.addColumn(famName1, colName1, dataValue1);
577         tt.put(tx2, put2);
578 
579         Transaction tx3 = tm.begin();
580 
581         Get get = new Get(rowName1);
582         Result result = tt.get(tx3, get);
583 
584         assertTrue(!result.isEmpty(), "Result should not be empty!");
585 
586         long tsRow = result.rawCells()[0].getTimestamp();
587         assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
588 
589         tm.commit(tx3);
590 
591         tt.close();
592     }
593 
594     @Test(timeOut = 60_000)
595     public void testScanFirstResult() throws Throwable {
596 
597         byte[] rowName1 = Bytes.toBytes("row1");
598         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
599         byte[] colName1 = Bytes.toBytes("col1");
600         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
601 
602         String TEST_TABLE = "testScanFirstResult";
603         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
604         TTable tt = new TTable(connection, TEST_TABLE);
605 
606         Transaction tx1 = tm.begin();
607 
608         Put row1 = new Put(rowName1);
609         row1.addColumn(famName1, colName1, dataValue1);
610         tt.put(tx1, row1);
611 
612         tm.commit(tx1);
613 
614         Transaction tx2 = tm.begin();
615 
616         ResultScanner iterableRS = tt.getScanner(tx2, new Scan().setStartRow(rowName1).setStopRow(rowName1));
617         Result result = iterableRS.next();
618         long tsRow = result.rawCells()[0].getTimestamp();
619         assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
620 
621         assertFalse(iterableRS.next() != null);
622 
623         tm.commit(tx2);
624 
625         Transaction tx3 = tm.begin();
626 
627         Put put3 = new Put(rowName1);
628         put3.addColumn(famName1, colName1, dataValue1);
629         tt.put(tx3, put3);
630 
631         tm.commit(tx3);
632 
633         Transaction tx4 = tm.begin();
634 
635         ResultScanner iterableRS2 = tt.getScanner(tx4, new Scan().setStartRow(rowName1).setStopRow(rowName1));
636         Result result2 = iterableRS2.next();
637         long tsRow2 = result2.rawCells()[0].getTimestamp();
638         assertEquals(tsRow2, tx3.getTransactionId(), "Reading differnt version");
639 
640         assertFalse(iterableRS2.next() != null);
641 
642         tm.commit(tx4);
643         tt.close();
644     }
645 
646 
647     @Test(timeOut = 60_000)
648     public void testScanWithFilter() throws Throwable {
649 
650         byte[] rowName1 = Bytes.toBytes("row1");
651         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
652         byte[] famName2 = Bytes.toBytes("test-fam2");
653         byte[] colName1 = Bytes.toBytes("col1");
654         byte[] colName2 = Bytes.toBytes("col2");
655         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
656 
657         String TEST_TABLE = "testScanWithFilter";
658         createTableIfNotExists(TEST_TABLE, famName1, famName2);
659         TTable tt = new TTable(connection, TEST_TABLE);
660 
661         Transaction tx1 = tm.begin();
662         Put put1 = new Put(rowName1);
663         put1.addColumn(famName1, colName1, dataValue1);
664         tt.put(tx1, put1);
665         tm.commit(tx1);
666 
667         Transaction tx2 = tm.begin();
668         Put put2 = new Put(rowName1);
669         put2.addColumn(famName2, colName2, dataValue1);
670         tt.put(tx2, put2);
671 
672         tm.commit(tx2);
673         Transaction tx3 = tm.begin();
674 
675         Scan scan = new Scan();
676         scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE,
677                 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY)))));
678         scan.setStartRow(rowName1).setStopRow(rowName1);
679 
680         ResultScanner iterableRS = tt.getScanner(tx3, scan);
681         Result result = iterableRS.next();
682         assertTrue(result.containsColumn(famName1, colName1));
683         assertFalse(result.containsColumn(famName2, colName2));
684 
685         scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE,
686                 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(TEST_FAMILY))),
687                 new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(famName2))));
688 
689         iterableRS = tt.getScanner(tx3, scan);
690         result = iterableRS.next();
691         assertTrue(result.containsColumn(famName1, colName1));
692         assertTrue(result.containsColumn(famName2, colName2));
693 
694         tm.commit(tx3);
695         tt.close();
696     }
697 
698 
699     @Test(timeOut = 60_000)
700     public void testScanSecondResult() throws Throwable {
701 
702         byte[] rowName1 = Bytes.toBytes("row1");
703         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
704         byte[] colName1 = Bytes.toBytes("col1");
705         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
706 
707         String TEST_TABLE = "testScanSecondResult";
708         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
709         TTable tt = new TTable(connection, TEST_TABLE);
710 
711         Transaction tx1 = tm.begin();
712 
713         Put put1 = new Put(rowName1);
714         put1.addColumn(famName1, colName1, dataValue1);
715         tt.put(tx1, put1);
716 
717         tm.commit(tx1);
718 
719         Transaction tx2 = tm.begin();
720 
721         Put put2 = new Put(rowName1);
722         put2.addColumn(famName1, colName1, dataValue1);
723         tt.put(tx2, put2);
724 
725         Transaction tx3 = tm.begin();
726 
727         ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName1));
728         Result result = iterableRS.next();
729         long tsRow = result.rawCells()[0].getTimestamp();
730         assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
731 
732         assertFalse(iterableRS.next() != null);
733 
734         tm.commit(tx3);
735 
736         tt.close();
737     }
738 
739     @Test (timeOut = 60_000)
740     public void testScanFewResults() throws Throwable {
741 
742         byte[] rowName1 = Bytes.toBytes("row1");
743         byte[] rowName2 = Bytes.toBytes("row2");
744         byte[] rowName3 = Bytes.toBytes("row3");
745         byte[] famName = Bytes.toBytes(TEST_FAMILY);
746         byte[] colName1 = Bytes.toBytes("col1");
747         byte[] colName2 = Bytes.toBytes("col2");
748         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
749         byte[] dataValue2 = Bytes.toBytes("testWrite-2");
750 
751         String TEST_TABLE = "testScanFewResults";
752         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
753         TTable tt = new TTable(connection, TEST_TABLE);
754 
755         Transaction tx1 = tm.begin();
756 
757         Put put1 = new Put(rowName1);
758         put1.addColumn(famName, colName1, dataValue1);
759         tt.put(tx1, put1);
760 
761         tm.commit(tx1);
762 
763         Transaction tx2 = tm.begin();
764 
765         Put put2 = new Put(rowName2);
766         put2.addColumn(famName, colName2, dataValue2);
767         tt.put(tx2, put2);
768 
769         tm.commit(tx2);
770 
771         Transaction tx3 = tm.begin();
772 
773         ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3));
774         Result result = iterableRS.next();
775         long tsRow = result.rawCells()[0].getTimestamp();
776         assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
777 
778         result = iterableRS.next();
779         tsRow = result.rawCells()[0].getTimestamp();
780         assertEquals(tsRow, tx2.getTransactionId(), "Reading differnt version");
781 
782         assertFalse(iterableRS.next() != null);
783 
784         tm.commit(tx3);
785 
786         tt.close();
787     }
788 
789     @Test (timeOut = 60_000)
790     public void testScanFewResultsDifferentTransaction() throws Throwable {
791 
792         byte[] rowName1 = Bytes.toBytes("row1");
793         byte[] rowName2 = Bytes.toBytes("row2");
794         byte[] rowName3 = Bytes.toBytes("row3");
795         byte[] famName = Bytes.toBytes(TEST_FAMILY);
796         byte[] colName1 = Bytes.toBytes("col1");
797         byte[] colName2 = Bytes.toBytes("col2");
798         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
799         byte[] dataValue2 = Bytes.toBytes("testWrite-2");
800 
801         String TEST_TABLE = "testScanFewResultsDifferentTransaction";
802         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
803         TTable tt = new TTable(connection, TEST_TABLE);
804 
805         Transaction tx1 = tm.begin();
806 
807         Put put1 = new Put(rowName1);
808         put1.addColumn(famName, colName1, dataValue1);
809         tt.put(tx1, put1);
810         Put put2 = new Put(rowName2);
811         put2.addColumn(famName, colName2, dataValue2);
812         tt.put(tx1, put2);
813 
814         tm.commit(tx1);
815 
816         Transaction tx2 = tm.begin();
817 
818         put2 = new Put(rowName2);
819         put2.addColumn(famName, colName2, dataValue2);
820         tt.put(tx2, put2);
821 
822         tm.commit(tx2);
823 
824         Transaction tx3 = tm.begin();
825 
826         ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3));
827         Result result = iterableRS.next();
828         long tsRow = result.rawCells()[0].getTimestamp();
829         assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
830 
831         result = iterableRS.next();
832         tsRow = result.rawCells()[0].getTimestamp();
833         assertEquals(tsRow, tx2.getTransactionId(), "Reading differnt version");
834 
835         assertFalse(iterableRS.next() != null);
836 
837         tm.commit(tx3);
838 
839         tt.close();
840     }
841 
842     @Test (timeOut = 60_000)
843     public void testScanFewResultsSameTransaction() throws Throwable {
844 
845         byte[] rowName1 = Bytes.toBytes("row1");
846         byte[] rowName2 = Bytes.toBytes("row2");
847         byte[] rowName3 = Bytes.toBytes("row3");
848         byte[] famName = Bytes.toBytes(TEST_FAMILY);
849         byte[] colName1 = Bytes.toBytes("col1");
850         byte[] colName2 = Bytes.toBytes("col2");
851         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
852         byte[] dataValue2 = Bytes.toBytes("testWrite-2");
853 
854         String TEST_TABLE = "testScanFewResultsSameTransaction";
855         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
856         TTable tt = new TTable(connection, TEST_TABLE);
857 
858         Transaction tx1 = tm.begin();
859 
860         Put put1 = new Put(rowName1);
861         put1.addColumn(famName, colName1, dataValue1);
862         tt.put(tx1, put1);
863         Put put2 = new Put(rowName2);
864         put2.addColumn(famName, colName2, dataValue2);
865         tt.put(tx1, put2);
866 
867         tm.commit(tx1);
868 
869         Transaction tx2 = tm.begin();
870 
871         put2 = new Put(rowName2);
872         put2.addColumn(famName, colName2, dataValue2);
873         tt.put(tx2, put2);
874 
875         Transaction tx3 = tm.begin();
876 
877         ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3));
878         Result result = iterableRS.next();
879         long tsRow = result.rawCells()[0].getTimestamp();
880         assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
881 
882         result = iterableRS.next();
883         tsRow = result.rawCells()[0].getTimestamp();
884         assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version");
885 
886         assertFalse(iterableRS.next() != null);
887 
888         tm.commit(tx3);
889 
890         tt.close();
891     }
892 
893 
894     @Test (timeOut = 60_000)
895     public void testFilterCommitCacheInSnapshot() throws Throwable {
896         String TEST_TABLE = "testFilterCommitCacheInSnapshot";
897         byte[] rowName = Bytes.toBytes("row1");
898         byte[] famName = Bytes.toBytes(TEST_FAMILY);
899 
900         createTableIfNotExists(TEST_TABLE, famName);
901         TTable tt = new TTable(connection, TEST_TABLE);
902 
903         Transaction tx1 = tm.begin();
904         Put put = new Put(rowName);
905         for (int i = 0; i < 200; ++i) {
906             byte[] dataValue1 = Bytes.toBytes("some data");
907             byte[] colName = Bytes.toBytes("col" + i);
908             put.addColumn(famName, colName, dataValue1);
909         }
910         tt.put(tx1, put);
911         tm.commit(tx1);
912         Transaction tx3 = tm.begin();
913 
914         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
915         SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
916                 tm.getCommitTableClient()));
917         Filter newFilter = TransactionFilters.getVisibilityFilter(null,
918                 snapshotFilter, (HBaseTransaction) tx3);
919 
920         Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
921 
922         Scan scan = new Scan();
923         ResultScanner scanner = rawTable.getScanner(scan);
924 
925         for(Result row: scanner) {
926             for(Cell cell: row.rawCells()) {
927                 newFilter.filterKeyValue(cell);
928 
929             }
930         }
931         verify(snapshotFilter, Mockito.times(0))
932                 .getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
933         tm.commit(tx3);
934         tt.close();
935     }
936 
937     @Test (timeOut = 60_000)
938     public void testFilterCommitCacheNotInSnapshot() throws Throwable {
939         String TEST_TABLE = "testFilterCommitCacheNotInSnapshot";
940         byte[] rowName = Bytes.toBytes("row1");
941         byte[] famName = Bytes.toBytes(TEST_FAMILY);
942 
943         createTableIfNotExists(TEST_TABLE, famName);
944         TTable tt = new TTable(connection, TEST_TABLE);
945 
946 
947         //add some uncommitted values
948         Transaction tx1 = tm.begin();
949         Put put = new Put(rowName);
950         for (int i = 0; i < 200; ++i) {
951             byte[] dataValue1 = Bytes.toBytes("some data");
952             byte[] colName = Bytes.toBytes("col" + i);
953             put.addColumn(famName, colName, dataValue1);
954         }
955         tt.put(tx1, put);
956 
957         //try to scan from tx
958         Transaction tx = tm.begin();
959         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
960         SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
961                 tm.getCommitTableClient()));
962         Filter newFilter = TransactionFilters.getVisibilityFilter(null,
963                 snapshotFilter, (HBaseTransaction) tx);
964 
965         Table rawTable = connection.getTable(TableName.valueOf(TEST_TABLE));
966 
967         Scan scan = new Scan();
968         ResultScanner scanner = rawTable.getScanner(scan);
969 
970         for(Result row: scanner) {
971             for(Cell cell: row.rawCells()) {
972                 newFilter.filterKeyValue(cell);
973             }
974         }
975         verify(snapshotFilter, Mockito.times(1))
976                 .getTSIfInSnapshot(any(Cell.class),any(HBaseTransaction.class), any(Map.class));
977         tt.close();
978     }
979 
980 
981 }