View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.mockito.Matchers.any;
21  import static org.mockito.Mockito.doAnswer;
22  import static org.mockito.Mockito.spy;
23  import static org.testng.Assert.assertEquals;
24  import static org.testng.Assert.assertNull;
25  
26  import org.apache.hadoop.hbase.client.Get;
27  import org.apache.hadoop.hbase.client.Put;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.client.ResultScanner;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.filter.BinaryComparator;
32  import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
33  import org.apache.hadoop.hbase.filter.CompareFilter;
34  import org.apache.hadoop.hbase.filter.Filter;
35  import org.apache.hadoop.hbase.filter.ValueFilter;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.omid.committable.CommitTable;
38  import org.apache.omid.metrics.NullMetricsProvider;
39  import org.mockito.invocation.InvocationOnMock;
40  import org.mockito.stubbing.Answer;
41  import org.testng.ITestContext;
42  import org.testng.annotations.Test;
43  
44  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
45  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
46  
47  /**
48   * Tests to verify that Get and Scan filters still work with transactions tables
49   */
50  @Test(groups = "sharedHBase")
51  public class TestFilters extends OmidTestBase {
52  
53      byte[] family = Bytes.toBytes(TEST_FAMILY);
54      private byte[] row1 = Bytes.toBytes("row1");
55      private byte[] row2 = Bytes.toBytes("row2");
56      private byte[] row3 = Bytes.toBytes("row3");
57      private byte[] prefix = Bytes.toBytes("foo");
58      private byte[] col1 = Bytes.toBytes("foobar");
59      private byte[] col2 = Bytes.toBytes("boofar");
60  
61      @Test(timeOut = 60_000)
62      public void testGetWithColumnPrefixFilter(ITestContext context) throws Exception {
63          testGet(context, new ColumnPrefixFilter(prefix));
64      }
65  
66      @Test(timeOut = 60_000)
67      public void testGetWithValueFilter(ITestContext context) throws Exception {
68          testGet(context, new ValueFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(col1)));
69      }
70  
71      private void testGet(ITestContext context, Filter f) throws Exception {
72  
73          CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
74  
75          HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
76          hbaseOmidClientConf.setConnectionString("localhost:1234");
77          hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
78  
79          TTable table = new TTable(connection, TEST_TABLE);
80          PostCommitActions syncPostCommitter = spy(
81                  new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
82          AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
83                  .commitTableClient(commitTableClient)
84                  .commitTableWriter(getCommitTable(context).getWriter())
85                  .postCommitter(syncPostCommitter)
86                  .build();
87  
88          writeRows(table, tm, syncPostCommitter);
89  
90          Transaction t = tm.begin();
91          Get g = new Get(row1);
92          g.setFilter(f);
93  
94          Result r = table.get(t, g);
95          assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
96          assertEquals(r.getColumnCells(family, col2).size(), 0 , "shouldn't exist in result");
97  
98          g = new Get(row2);
99          g.setFilter(f);
100         r = table.get(t, g);
101         assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
102         assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
103 
104         g = new Get(row3);
105         g.setFilter(f);
106         r = table.get(t, g);
107         assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
108 
109     }
110 
111     @Test(timeOut = 60_000)
112     public void testScanWithColumnPrefixFilter(ITestContext context) throws Exception {
113         testScan(context, new ColumnPrefixFilter(prefix));
114     }
115 
116     @Test(timeOut = 60_000)
117     public void testScanWithValueFilter(ITestContext context) throws Exception {
118         testScan(context, new ValueFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(col1)));
119     }
120 
121     private void testScan(ITestContext context, Filter f) throws Exception {
122 
123         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
124 
125         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
126         hbaseOmidClientConf.getOmidClientConfiguration().setConnectionString("localhost:1234");
127         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
128         TTable table = new TTable(connection, TEST_TABLE);
129         PostCommitActions syncPostCommitter = spy(
130                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
131         AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
132                 .commitTableClient(commitTableClient)
133                 .commitTableWriter(getCommitTable(context).getWriter())
134                 .postCommitter(syncPostCommitter)
135                 .build();
136 
137         writeRows(table, tm, syncPostCommitter);
138 
139         Transaction t = tm.begin();
140         Scan s = new Scan().setFilter(f);
141 
142         ResultScanner rs = table.getScanner(t, s);
143 
144         Result r = rs.next();
145         assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
146         assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
147 
148         r = rs.next();
149         assertEquals(r.getColumnCells(family, col1).size(), 1, "should exist in result");
150         assertEquals(r.getColumnCells(family, col2).size(), 0, "shouldn't exist in result");
151 
152         r = rs.next();
153         assertNull(r, "Last row shouldn't exist");
154 
155     }
156 
157     private void writeRows(TTable table, TransactionManager tm, PostCommitActions postCommitter)
158             throws Exception {
159         // create normal row with both cells
160         Transaction t = tm.begin();
161         Put p = new Put(row1);
162         p.addColumn(family, col1, col1);
163         p.addColumn(family, col2, col2);
164         table.put(t, p);
165         tm.commit(t);
166 
167         // create normal row, but fail to update shadow cells
168         doAnswer(new Answer<ListenableFuture<Void>>() {
169             public ListenableFuture<Void> answer(InvocationOnMock invocation) {
170                 // Do not invoke the real method
171                 return SettableFuture.create();
172             }
173         }).when(postCommitter).updateShadowCells(any(HBaseTransaction.class));
174 
175         t = tm.begin();
176         p = new Put(row2);
177         p.addColumn(family, col1, col1);
178         p.addColumn(family, col2, col2);
179         table.put(t, p);
180         try {
181             tm.commit(t);
182         } catch (TransactionException e) {
183             // Expected, see comment above
184         }
185 
186         // create normal row with only one cell
187         t = tm.begin();
188         p = new Put(row3);
189         p.addColumn(family, col2, col2);
190         table.put(t, p);
191         try {
192             tm.commit(t);
193         } catch (TransactionException e) {
194             // Expected, see comment above
195         }
196     }
197 
198 }