View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.mockito.Mockito.spy;
21  import static org.testng.Assert.assertTrue;
22  
23  import java.io.IOException;
24  
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.Coprocessor;
27  import org.apache.hadoop.hbase.HBaseTestingUtility;
28  import org.apache.hadoop.hbase.HColumnDescriptor;
29  import org.apache.hadoop.hbase.HTableDescriptor;
30  import org.apache.hadoop.hbase.MiniHBaseCluster;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.Admin;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.Get;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Result;
38  import org.apache.hadoop.hbase.client.ResultScanner;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.omid.TestUtils;
42  import org.apache.omid.committable.CommitTable;
43  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
44  import org.apache.omid.metrics.NullMetricsProvider;
45  import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
46  import org.apache.omid.tso.TSOServer;
47  import org.apache.omid.tso.TSOServerConfig;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  import org.testng.annotations.AfterClass;
51  import org.testng.annotations.BeforeClass;
52  import org.testng.annotations.BeforeMethod;
53  import org.testng.annotations.Test;
54  
55  import com.google.inject.Guice;
56  import com.google.inject.Injector;
57  
58  public class TestSnapshotFilterLL {
59  
60      private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFilterLL.class);
61  
62      private static final String TEST_FAMILY = "test-fam";
63  
64      private static final int MAX_VERSIONS = 3;
65  
66      private AbstractTransactionManager tm;
67  
68      private Injector injector;
69  
70      private Admin admin;
71      private Configuration hbaseConf;
72      private HBaseTestingUtility hbaseTestUtil;
73      private MiniHBaseCluster hbaseCluster;
74  
75      private TSOServer tso;
76  
77      private CommitTable commitTable;
78      private PostCommitActions syncPostCommitter;
79      private Connection connection;
80  
81      @BeforeClass
82      public void setupTestSnapshotFilter() throws Exception {
83          TSOServerConfig tsoConfig = new TSOServerConfig();
84          tsoConfig.setPort(5678);
85          tsoConfig.setConflictMapSize(1);
86          tsoConfig.setWaitStrategy("LOW_CPU");
87          tsoConfig.setLowLatency(true);
88          injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig));
89          hbaseConf = injector.getInstance(Configuration.class);
90          hbaseConf.setBoolean("omid.server.side.filter", true);
91          hbaseConf.setInt("hbase.hconnection.threads.core", 5);
92          hbaseConf.setInt("hbase.hconnection.threads.max", 10);
93          // Tunn down handler threads in regionserver
94          hbaseConf.setInt("hbase.regionserver.handler.count", 10);
95  
96          // Set to random port
97          hbaseConf.setInt("hbase.master.port", 0);
98          hbaseConf.setInt("hbase.master.info.port", 0);
99          hbaseConf.setInt("hbase.regionserver.port", 0);
100         hbaseConf.setInt("hbase.regionserver.info.port", 0);
101 
102 
103         HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
104         HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
105 
106         setupHBase();
107         connection = ConnectionFactory.createConnection(hbaseConf);
108         admin = connection.getAdmin();
109         createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
110         setupTSO();
111 
112         commitTable = injector.getInstance(CommitTable.class);
113     }
114 
115     private void setupHBase() throws Exception {
116         LOG.info("--------------------------------------------------------------------------------------------------");
117         LOG.info("Setting up HBase");
118         LOG.info("--------------------------------------------------------------------------------------------------");
119         hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
120         LOG.info("--------------------------------------------------------------------------------------------------");
121         LOG.info("Creating HBase MiniCluster");
122         LOG.info("--------------------------------------------------------------------------------------------------");
123         hbaseCluster = hbaseTestUtil.startMiniCluster(1);
124     }
125 
126     private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
127                                            HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
128         createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
129 
130         createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
131     }
132 
133     private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
134         if (!admin.tableExists(TableName.valueOf(tableName))) {
135             LOG.info("Creating {} table...", tableName);
136             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
137 
138             for (byte[] family : families) {
139                 HColumnDescriptor datafam = new HColumnDescriptor(family);
140                 datafam.setMaxVersions(MAX_VERSIONS);
141                 desc.addFamily(datafam);
142             }
143 
144             int priority = Coprocessor.PRIORITY_HIGHEST;
145 
146             desc.addCoprocessor(OmidSnapshotFilter.class.getName(),null,++priority,null);
147             desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,++priority,null);
148 
149             admin.createTable(desc);
150             try {
151                 hbaseTestUtil.waitTableAvailable(TableName.valueOf(tableName),5000);
152             } catch (InterruptedException e) {
153                 e.printStackTrace();
154             }
155         }
156 
157     }
158 
159     private void setupTSO() throws IOException, InterruptedException {
160         tso = injector.getInstance(TSOServer.class);
161         tso.startAsync();
162         tso.awaitRunning();
163         TestUtils.waitForSocketListening("localhost", 5678, 100);
164         Thread.currentThread().setName("UnitTest(s) thread");
165     }
166 
167     @AfterClass
168     public void cleanupTestSnapshotFilter() throws Exception {
169         teardownTSO();
170         hbaseCluster.shutdown();
171     }
172 
173     private void teardownTSO() throws IOException, InterruptedException {
174         tso.stopAsync();
175         tso.awaitTerminated();
176         TestUtils.waitForSocketNotListening("localhost", 5678, 1000);
177     }
178 
179     @BeforeMethod
180     public void setupTestSnapshotFilterIndividualTest() throws Exception {
181         tm = spy((AbstractTransactionManager) newTransactionManager());
182     }
183 
184     private TransactionManager newTransactionManager() throws Exception {
185         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
186         hbaseOmidClientConf.setConnectionString("localhost:5678");
187         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
188         CommitTable.Client commitTableClient = commitTable.getClient();
189         syncPostCommitter =
190                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
191         return HBaseTransactionManager.builder(hbaseOmidClientConf)
192                 .postCommitter(syncPostCommitter)
193                 .commitTableClient(commitTableClient)
194                 .build();
195     }
196 
197 
198     @Test(timeOut = 60_000)
199     public void testInvalidate() throws Throwable {
200         byte[] rowName1 = Bytes.toBytes("row1");
201         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
202         byte[] colName1 = Bytes.toBytes("col1");
203         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
204 
205         String TEST_TABLE = "testGetFirstResult";
206         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
207         TTable tt = new TTable(connection, TEST_TABLE);
208 
209         Transaction tx1 = tm.begin();
210 
211         Put row1 = new Put(rowName1);
212         row1.addColumn(famName1, colName1, dataValue1);
213         tt.put(tx1, row1);
214 
215 
216         Transaction tx2 = tm.begin();
217 
218         Get get = new Get(rowName1);
219         Result result = tt.get(tx2, get);
220 
221         assertTrue(result.isEmpty(), "Result should not be empty!");
222 
223 
224         boolean gotInvalidated = false;
225         try {
226             tm.commit(tx1);
227         } catch (RollbackException e) {
228             gotInvalidated = true;
229         }
230         assertTrue(gotInvalidated);
231         assertTrue(tm.isLowLatency());
232     }
233 
234     @Test(timeOut = 60_000)
235     public void testInvalidateByScan() throws Throwable {
236         byte[] rowName1 = Bytes.toBytes("row1");
237         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
238         byte[] colName1 = Bytes.toBytes("col1");
239         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
240 
241         String TEST_TABLE = "testGetFirstResult";
242         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
243         TTable tt = new TTable(connection, TEST_TABLE);
244 
245         Transaction tx1 = tm.begin();
246 
247         Put row1 = new Put(rowName1);
248         row1.addColumn(famName1, colName1, dataValue1);
249         tt.put(tx1, row1);
250 
251 
252         Transaction tx2 = tm.begin();
253 
254         ResultScanner iterableRS = tt.getScanner(tx2, new Scan().setStartRow(rowName1).setStopRow(rowName1));
255         assertTrue(iterableRS.next() == null);
256 
257         tm.commit(tx2);
258 
259         boolean gotInvalidated = false;
260         try {
261             tm.commit(tx1);
262         } catch (RollbackException e) {
263             gotInvalidated = true;
264         }
265         assertTrue(gotInvalidated);
266         assertTrue(tm.isLowLatency());
267     }
268 
269 }