View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  
21  import static org.mockito.Mockito.doReturn;
22  
23  import static org.mockito.Mockito.spy;
24  import static org.testng.Assert.assertEquals;
25  import static org.testng.Assert.assertFalse;
26  
27  import static org.testng.Assert.assertTrue;
28  import static org.testng.Assert.fail;
29  
30  import java.io.IOException;
31  
32  
33  import java.util.Random;
34  
35  import org.apache.hadoop.conf.Configuration;
36  
37  
38  import org.apache.hadoop.hbase.Coprocessor;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.MiniHBaseCluster;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.client.Admin;
45  import org.apache.hadoop.hbase.client.Connection;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  
48  import org.apache.hadoop.hbase.client.Put;
49  
50  import org.apache.hadoop.hbase.client.ResultScanner;
51  
52  import org.apache.hadoop.hbase.client.Scan;
53  import org.apache.hadoop.hbase.client.Table;
54  import org.apache.hadoop.hbase.util.Bytes;
55  
56  import org.apache.omid.TestUtils;
57  import org.apache.omid.committable.CommitTable;
58  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
59  import org.apache.omid.metrics.NullMetricsProvider;
60  import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
61  import org.apache.omid.tso.TSOServer;
62  import org.apache.omid.tso.TSOServerConfig;
63  
64  import org.slf4j.Logger;
65  import org.slf4j.LoggerFactory;
66  import org.testng.annotations.AfterClass;
67  import org.testng.annotations.BeforeClass;
68  import org.testng.annotations.BeforeMethod;
69  import org.testng.annotations.Test;
70  
71  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
72  import com.google.inject.Guice;
73  import com.google.inject.Injector;
74  
75  public class TestCompactionLL {
76  
77      private static final Logger LOG = LoggerFactory.getLogger(TestCompactionLL.class);
78  
79      private static final String TEST_FAMILY = "test-fam";
80      private static final String TEST_QUALIFIER = "test-qual";
81  
82      private final byte[] fam = Bytes.toBytes(TEST_FAMILY);
83      private final byte[] qual = Bytes.toBytes(TEST_QUALIFIER);
84      private final byte[] data = Bytes.toBytes("testWrite-1");
85  
86      private static final int MAX_VERSIONS = 3;
87  
88      private Random randomGenerator;
89      private AbstractTransactionManager tm;
90  
91      private Injector injector;
92  
93      private Admin admin;
94      private Configuration hbaseConf;
95      private HBaseTestingUtility hbaseTestUtil;
96      private MiniHBaseCluster hbaseCluster;
97  
98      private TSOServer tso;
99  
100 
101     private CommitTable commitTable;
102     private PostCommitActions syncPostCommitter;
103     private static Connection connection;
104 
105     @BeforeClass
106     public void setupTestCompation() throws Exception {
107         TSOServerConfig tsoConfig = new TSOServerConfig();
108         tsoConfig.setPort(1235);
109         tsoConfig.setConflictMapSize(1);
110         tsoConfig.setLowLatency(true);
111         tsoConfig.setWaitStrategy("LOW_CPU");
112         injector = Guice.createInjector(new TSOForHBaseCompactorTestModule(tsoConfig));
113         hbaseConf = injector.getInstance(Configuration.class);
114         HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
115         HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
116 
117         // settings required for #testDuplicateDeletes()
118         hbaseConf.setInt("hbase.hstore.compaction.min", 2);
119         hbaseConf.setInt("hbase.hstore.compaction.max", 2);
120         setupHBase();
121         connection = ConnectionFactory.createConnection(hbaseConf);
122         admin = connection.getAdmin();
123         createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
124         setupTSO();
125 
126         commitTable = injector.getInstance(CommitTable.class);
127     }
128 
129     private void setupHBase() throws Exception {
130         LOG.info("--------------------------------------------------------------------------------------------------");
131         LOG.info("Setting up HBase");
132         LOG.info("--------------------------------------------------------------------------------------------------");
133         hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
134         LOG.info("--------------------------------------------------------------------------------------------------");
135         LOG.info("Creating HBase MiniCluster");
136         LOG.info("--------------------------------------------------------------------------------------------------");
137         hbaseCluster = hbaseTestUtil.startMiniCluster(1);
138     }
139 
140     private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
141                                            HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
142         createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
143 
144         createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
145     }
146 
147     private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
148         if (!admin.tableExists(TableName.valueOf(tableName))) {
149             LOG.info("Creating {} table...", tableName);
150             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
151 
152             for (byte[] family : families) {
153                 HColumnDescriptor datafam = new HColumnDescriptor(family);
154                 datafam.setMaxVersions(MAX_VERSIONS);
155                 desc.addFamily(datafam);
156             }
157 
158             desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,Coprocessor.PRIORITY_HIGHEST,null);
159             admin.createTable(desc);
160             for (byte[] family : families) {
161                 CompactorUtil.enableOmidCompaction(connection, TableName.valueOf(tableName), family);
162             }
163         }
164 
165     }
166 
167     private void setupTSO() throws IOException, InterruptedException {
168         tso = injector.getInstance(TSOServer.class);
169         tso.startAsync();
170         tso.awaitRunning();
171         TestUtils.waitForSocketListening("localhost", 1235, 100);
172         Thread.currentThread().setName("UnitTest(s) thread");
173     }
174 
175     @AfterClass
176     public void cleanupTestCompation() throws Exception {
177         teardownTSO();
178         hbaseCluster.shutdown();
179     }
180 
181     private void teardownTSO() throws IOException, InterruptedException {
182         tso.stopAsync();
183         tso.awaitTerminated();
184         TestUtils.waitForSocketNotListening("localhost", 1235, 1000);
185     }
186 
187     @BeforeMethod
188     public void setupTestCompactionIndividualTest() throws Exception {
189         randomGenerator = new Random(0xfeedcafeL);
190         tm = spy((AbstractTransactionManager) newTransactionManager());
191     }
192 
193     private TransactionManager newTransactionManager() throws Exception {
194         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
195         hbaseOmidClientConf.setConnectionString("localhost:1235");
196         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
197         CommitTable.Client commitTableClient = commitTable.getClient();
198         syncPostCommitter =
199                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
200         return HBaseTransactionManager.builder(hbaseOmidClientConf)
201                 .postCommitter(syncPostCommitter)
202                 .commitTableClient(commitTableClient)
203                 .build();
204     }
205 
206 
207 
208     @Test(timeOut = 60_000)
209     public void testRowsUnalteredWhenCommitTableCannotBeReached() throws Throwable {
210         String TEST_TABLE = "testRowsUnalteredWhenCommitTableCannotBeReachedLL";
211         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
212         TTable txTable = new TTable(connection, TEST_TABLE);
213 
214         // The KV in this transaction should be discarded but in the end should remain there because
215         // the commit table won't be accessed (simulating an error on access)
216         HBaseTransaction neverendingTx = (HBaseTransaction) tm.begin();
217         long rowId = randomGenerator.nextLong();
218         Put put = new Put(Bytes.toBytes(rowId));
219         put.addColumn(fam, qual, data);
220         txTable.put(neverendingTx, put);
221         assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
222                 new TTableCellGetterAdapter(txTable)),
223                 "Cell should be there");
224         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
225                 new TTableCellGetterAdapter(txTable)),
226                 "Shadow cell should not be there");
227 
228         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table before flushing");
229         LOG.info("Flushing table {}", TEST_TABLE);
230         admin.flush(TableName.valueOf(TEST_TABLE));
231         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table after flushing");
232 
233         // Break access to CommitTable functionality in Compactor
234         LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
235         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
236                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
237         CommitTable commitTable = injector.getInstance(CommitTable.class);
238         CommitTable.Client commitTableClient = spy(commitTable.getClient());
239         SettableFuture<Long> f = SettableFuture.create();
240         f.setException(new IOException("Unable to read"));
241         doReturn(f).when(commitTableClient).readLowWatermark();
242         omidCompactor.commitTableClient = commitTableClient;
243         LOG.info("Compacting table {}", TEST_TABLE);
244         admin.majorCompact(TableName.valueOf(TEST_TABLE)); // Should trigger the error when accessing CommitTable funct.
245 
246         LOG.info("Sleeping for 3 secs");
247         Thread.sleep(3000);
248         LOG.info("Waking up after 3 secs");
249 
250         // All rows should be there after the failed compaction
251         assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
252         assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
253                 new TTableCellGetterAdapter(txTable)),
254                 "Cell should be there");
255         assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
256                 new TTableCellGetterAdapter(txTable)),
257                 "Shadow cell should not be there");
258     }
259 
260 
261     @Test(timeOut = 60_000)
262     // test omid-147 in ll mode the scanner should invalidate the transaction
263     public void testCommitTableInvalidation() throws Exception {
264         String TEST_TABLE = "testCommitTableInvalidationLL";
265         createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
266         TTable txTable = new TTable(connection, TEST_TABLE);
267         byte[] rowId = Bytes.toBytes("row");
268 
269         HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
270         Put p = new Put(rowId);
271         p.addColumn(fam, qual, Bytes.toBytes("testValue"));
272         txTable.put(tx1, p);
273 
274         HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
275         compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
276 
277         try {
278             //give compaction time to invalidate
279             Thread.sleep(1000);
280 
281             tm.commit(tx1);
282             fail(" Should have been invalidated");
283         } catch (RollbackException e) {
284             e.printStackTrace();
285         }
286     }
287 
288 
289     private void setCompactorLWM(long lwm, String tableName) throws Exception {
290         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(tableName)).get(0)
291                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
292         CommitTable commitTable = injector.getInstance(CommitTable.class);
293         CommitTable.Client commitTableClient = spy(commitTable.getClient());
294         SettableFuture<Long> f = SettableFuture.create();
295         f.set(lwm);
296         doReturn(f).when(commitTableClient).readLowWatermark();
297         omidCompactor.commitTableClient = commitTableClient;
298     }
299 
300     private void compactEverything(String tableName) throws Exception {
301         compactWithLWM(Long.MAX_VALUE, tableName);
302     }
303 
304     private void compactWithLWM(long lwm, String tableName) throws Exception {
305         admin.flush(TableName.valueOf(tableName));
306 
307         LOG.info("Regions in table {}: {}", tableName, hbaseCluster.getRegions(Bytes.toBytes(tableName)).size());
308         setCompactorLWM(lwm, tableName);
309         LOG.info("Compacting table {}", tableName);
310         admin.majorCompact(TableName.valueOf(tableName));
311 
312         LOG.info("Sleeping for 3 secs");
313         Thread.sleep(3000);
314         LOG.info("Waking up after 3 secs");
315     }
316 
317     private static long rowCount(String tableName, byte[] family) throws Throwable {
318         Scan scan = new Scan();
319         scan.addFamily(family);
320         Table table = connection.getTable(TableName.valueOf(tableName));
321         try (ResultScanner scanner = table.getScanner(scan)) {
322             int count = 0;
323             while (scanner.next() != null) {
324                 count++;
325             }
326             return count;
327         }
328     }
329 }