View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.lang.reflect.Method;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.MiniHBaseCluster;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.Admin;
36  import org.apache.hadoop.hbase.client.Connection;
37  import org.apache.hadoop.hbase.client.ConnectionFactory;
38  import org.apache.hadoop.hbase.client.Get;
39  import org.apache.hadoop.hbase.client.HBaseAdmin;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.Table;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.omid.TestUtils;
44  import org.apache.omid.committable.CommitTable;
45  import org.apache.omid.committable.InMemoryCommitTable;
46  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
47  import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
48  import org.apache.omid.tools.hbase.OmidTableManager;
49  import org.apache.omid.tso.TSOMockModule;
50  import org.apache.omid.tso.TSOServer;
51  import org.apache.omid.tso.TSOServerConfig;
52  import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
53  import org.apache.omid.tso.client.OmidClientConfiguration;
54  import org.apache.omid.tso.client.TSOClient;
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  import org.testng.ITestContext;
58  import org.testng.annotations.AfterGroups;
59  import org.testng.annotations.AfterMethod;
60  import org.testng.annotations.BeforeGroups;
61  import org.testng.annotations.BeforeMethod;
62  
63  import com.google.inject.Guice;
64  import com.google.inject.Injector;
65  
66  public abstract class OmidTestBase {
67  
68      private static final Logger LOG = LoggerFactory.getLogger(OmidTestBase.class);
69  
70      static HBaseTestingUtility hBaseUtils;
71      private static MiniHBaseCluster hbaseCluster;
72      static Configuration hbaseConf;
73      static Connection connection;
74  
75      protected static final String TEST_TABLE = "test";
76      protected static final String TEST_FAMILY = "data";
77      static final String TEST_FAMILY2 = "data2";
78  
79      private HBaseCommitTableConfig hBaseCommitTableConfig;
80  
81      @BeforeMethod(alwaysRun = true)
82      public void beforeClass(Method method) throws Exception {
83          Thread.currentThread().setName("UnitTest-" + method.getName());
84      }
85  
86  
87      @BeforeGroups(groups = "sharedHBase")
88      public void beforeGroups(ITestContext context) throws Exception {
89          // TSO Setup
90          TSOServerConfig tsoConfig = new TSOServerConfig();
91          tsoConfig.setPort(1234);
92          tsoConfig.setConflictMapSize(1000);
93          tsoConfig.setWaitStrategy("LOW_CPU");
94          tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
95          Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
96          LOG.info("Starting TSO");
97          TSOServer tso = injector.getInstance(TSOServer.class);
98          hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
99          HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
100         tso.startAsync();
101         tso.awaitRunning();
102         TestUtils.waitForSocketListening("localhost", 1234, 100);
103         LOG.info("Finished loading TSO");
104         context.setAttribute("tso", tso);
105 
106         OmidClientConfiguration clientConf = new OmidClientConfiguration();
107         clientConf.setConnectionString("localhost:1234");
108         context.setAttribute("clientConf", clientConf);
109 
110         InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
111         context.setAttribute("commitTable", commitTable);
112 
113         // Create the associated Handler
114         TSOClient client = TSOClient.newInstance(clientConf);
115         context.setAttribute("client", client);
116 
117         // ------------------------------------------------------------------------------------------------------------
118         // HBase setup
119         // ------------------------------------------------------------------------------------------------------------
120         LOG.info("Creating HBase minicluster");
121         hbaseConf = HBaseConfiguration.create();
122         hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
123         hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
124         hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
125 
126         File tempFile = File.createTempFile("OmidTest", "");
127         tempFile.deleteOnExit();
128         hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
129         hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
130         hBaseUtils = new HBaseTestingUtility(hbaseConf);
131         hbaseCluster = hBaseUtils.startMiniCluster(1);
132         connection = ConnectionFactory.createConnection(hbaseConf);
133         hBaseUtils.createTable(TableName.valueOf(hBaseTimestampStorageConfig.getTableName()),
134                 new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
135                 Integer.MAX_VALUE);
136         createTestTable();
137         createCommitTable();
138 
139         LOG.info("HBase minicluster is up");
140     }
141 
142     protected void createTestTable() throws IOException {
143         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
144         HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
145         HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
146         HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2);
147         datafam.setMaxVersions(Integer.MAX_VALUE);
148         datafam2.setMaxVersions(Integer.MAX_VALUE);
149         test_table_desc.addFamily(datafam);
150         test_table_desc.addFamily(datafam2);
151         admin.createTable(test_table_desc);
152     }
153 
154     private void createCommitTable() throws IOException {
155         String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"};
156         OmidTableManager omidTableManager = new OmidTableManager(args);
157         omidTableManager.executeActionsOnHBase(hbaseConf);
158     }
159 
160 
161     private TSOServer getTSO(ITestContext context) {
162         return (TSOServer) context.getAttribute("tso");
163     }
164 
165 
166     TSOClient getClient(ITestContext context) {
167         return (TSOClient) context.getAttribute("client");
168     }
169 
170     InMemoryCommitTable getCommitTable(ITestContext context) {
171         return (InMemoryCommitTable) context.getAttribute("commitTable");
172     }
173 
174     protected TransactionManager newTransactionManager(ITestContext context) throws Exception {
175         return newTransactionManager(context, getClient(context));
176     }
177 
178     protected TransactionManager newTransactionManager(ITestContext context, PostCommitActions postCommitActions) throws Exception {
179         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
180         clientConf.setConnectionString("localhost:1234");
181         clientConf.setHBaseConfiguration(hbaseConf);
182         return HBaseTransactionManager.builder(clientConf)
183                 .postCommitter(postCommitActions)
184                 .commitTableClient(getCommitTable(context).getClient())
185                 .commitTableWriter(getCommitTable(context).getWriter())
186                 .tsoClient(getClient(context)).build();
187     }
188 
189     protected TransactionManager newTransactionManager(ITestContext context, TSOClient tsoClient) throws Exception {
190         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
191         clientConf.setConnectionString("localhost:1234");
192         clientConf.setHBaseConfiguration(hbaseConf);
193         return HBaseTransactionManager.builder(clientConf)
194                 .commitTableClient(getCommitTable(context).getClient())
195                 .commitTableWriter(getCommitTable(context).getWriter())
196                 .tsoClient(tsoClient).build();
197     }
198 
199     protected TransactionManager newTransactionManager(ITestContext context, CommitTable.Client commitTableClient)
200             throws Exception {
201         HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
202         clientConf.setConnectionString("localhost:1234");
203         clientConf.setHBaseConfiguration(hbaseConf);
204         return HBaseTransactionManager.builder(clientConf)
205                 .commitTableClient(commitTableClient)
206                 .commitTableWriter(getCommitTable(context).getWriter())
207                 .tsoClient(getClient(context)).build();
208     }
209 
210     @AfterGroups(groups = "sharedHBase")
211     public void afterGroups(ITestContext context) throws Exception {
212         LOG.info("Tearing down OmidTestBase...");
213         if (hbaseCluster != null) {
214             hBaseUtils.shutdownMiniCluster();
215         }
216 
217         getClient(context).close().get();
218         getTSO(context).stopAsync();
219         getTSO(context).awaitTerminated();
220         TestUtils.waitForSocketNotListening("localhost", 1234, 1000);
221     }
222 
223     @AfterMethod(groups = "sharedHBase", timeOut = 60_000)
224     public void afterMethod() {
225         try {
226             LOG.info("tearing Down");
227             Admin admin = hBaseUtils.getHBaseAdmin();
228             deleteTable(admin, TableName.valueOf(TEST_TABLE));
229             createTestTable();
230             if (hBaseCommitTableConfig != null) {
231                 deleteTable(admin, TableName.valueOf(hBaseCommitTableConfig.getTableName()));
232             }
233             createCommitTable();
234         } catch (Exception e) {
235             LOG.error("Error tearing down", e);
236         }
237     }
238 
239     void deleteTable(Admin admin, TableName tableName) throws IOException {
240         if (admin.tableExists(tableName)) {
241             if (admin.isTableDisabled(tableName)) {
242                 admin.deleteTable(tableName);
243             } else {
244                 admin.disableTable(tableName);
245                 admin.deleteTable(tableName);
246             }
247         }
248     }
249 
250     static boolean verifyValue(Table table, byte[] row,
251                                byte[] fam, byte[] col, byte[] value) {
252 
253         try {
254             Get g = new Get(row).setMaxVersions(1);
255             Result r = table.get(g);
256             Cell cell = r.getColumnLatestCell(fam, col);
257 
258             if (LOG.isTraceEnabled()) {
259                 LOG.trace("Value for " + table.getName().getNameAsString() + ":"
260                                   + Bytes.toString(row) + ":" + Bytes.toString(fam)
261                                   + Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(cell))
262                                   + " (" + Bytes.toString(value) + " expected)");
263             }
264 
265             return Bytes.equals(CellUtil.cloneValue(cell), value);
266         } catch (IOException e) {
267             LOG.error("Error reading row " + table.getName().getNameAsString() + ":"
268                               + Bytes.toString(row) + ":" + Bytes.toString(fam)
269                               + Bytes.toString(col), e);
270             return false;
271         }
272     }
273 }