1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20
21 import static org.mockito.Mockito.doReturn;
22
23 import static org.mockito.Mockito.spy;
24 import static org.testng.Assert.assertEquals;
25 import static org.testng.Assert.assertFalse;
26
27 import static org.testng.Assert.assertTrue;
28 import static org.testng.Assert.fail;
29
30 import java.io.IOException;
31
32
33 import java.util.Random;
34
35 import org.apache.hadoop.conf.Configuration;
36
37
38 import org.apache.hadoop.hbase.Coprocessor;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HTableDescriptor;
42 import org.apache.hadoop.hbase.MiniHBaseCluster;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.client.Admin;
45 import org.apache.hadoop.hbase.client.Connection;
46 import org.apache.hadoop.hbase.client.ConnectionFactory;
47
48 import org.apache.hadoop.hbase.client.Put;
49
50 import org.apache.hadoop.hbase.client.ResultScanner;
51
52 import org.apache.hadoop.hbase.client.Scan;
53 import org.apache.hadoop.hbase.client.Table;
54 import org.apache.hadoop.hbase.util.Bytes;
55
56 import org.apache.omid.TestUtils;
57 import org.apache.omid.committable.CommitTable;
58 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
59 import org.apache.omid.metrics.NullMetricsProvider;
60 import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
61 import org.apache.omid.tso.TSOServer;
62 import org.apache.omid.tso.TSOServerConfig;
63
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66 import org.testng.annotations.AfterClass;
67 import org.testng.annotations.BeforeClass;
68 import org.testng.annotations.BeforeMethod;
69 import org.testng.annotations.Test;
70
71 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
72 import com.google.inject.Guice;
73 import com.google.inject.Injector;
74
75 public class TestCompactionLL {
76
77 private static final Logger LOG = LoggerFactory.getLogger(TestCompactionLL.class);
78
79 private static final String TEST_FAMILY = "test-fam";
80 private static final String TEST_QUALIFIER = "test-qual";
81
82 private final byte[] fam = Bytes.toBytes(TEST_FAMILY);
83 private final byte[] qual = Bytes.toBytes(TEST_QUALIFIER);
84 private final byte[] data = Bytes.toBytes("testWrite-1");
85
86 private static final int MAX_VERSIONS = 3;
87
88 private Random randomGenerator;
89 private AbstractTransactionManager tm;
90
91 private Injector injector;
92
93 private Admin admin;
94 private Configuration hbaseConf;
95 private HBaseTestingUtility hbaseTestUtil;
96 private MiniHBaseCluster hbaseCluster;
97
98 private TSOServer tso;
99
100
101 private CommitTable commitTable;
102 private PostCommitActions syncPostCommitter;
103 private static Connection connection;
104
105 @BeforeClass
106 public void setupTestCompation() throws Exception {
107 TSOServerConfig tsoConfig = new TSOServerConfig();
108 tsoConfig.setPort(1235);
109 tsoConfig.setConflictMapSize(1);
110 tsoConfig.setLowLatency(true);
111 tsoConfig.setWaitStrategy("LOW_CPU");
112 injector = Guice.createInjector(new TSOForHBaseCompactorTestModule(tsoConfig));
113 hbaseConf = injector.getInstance(Configuration.class);
114 HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
115 HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
116
117
118 hbaseConf.setInt("hbase.hstore.compaction.min", 2);
119 hbaseConf.setInt("hbase.hstore.compaction.max", 2);
120 setupHBase();
121 connection = ConnectionFactory.createConnection(hbaseConf);
122 admin = connection.getAdmin();
123 createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
124 setupTSO();
125
126 commitTable = injector.getInstance(CommitTable.class);
127 }
128
129 private void setupHBase() throws Exception {
130 LOG.info("--------------------------------------------------------------------------------------------------");
131 LOG.info("Setting up HBase");
132 LOG.info("--------------------------------------------------------------------------------------------------");
133 hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
134 LOG.info("--------------------------------------------------------------------------------------------------");
135 LOG.info("Creating HBase MiniCluster");
136 LOG.info("--------------------------------------------------------------------------------------------------");
137 hbaseCluster = hbaseTestUtil.startMiniCluster(1);
138 }
139
140 private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
141 HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
142 createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
143
144 createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
145 }
146
147 private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
148 if (!admin.tableExists(TableName.valueOf(tableName))) {
149 LOG.info("Creating {} table...", tableName);
150 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
151
152 for (byte[] family : families) {
153 HColumnDescriptor datafam = new HColumnDescriptor(family);
154 datafam.setMaxVersions(MAX_VERSIONS);
155 desc.addFamily(datafam);
156 }
157
158 desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,Coprocessor.PRIORITY_HIGHEST,null);
159 admin.createTable(desc);
160 for (byte[] family : families) {
161 CompactorUtil.enableOmidCompaction(connection, TableName.valueOf(tableName), family);
162 }
163 }
164
165 }
166
167 private void setupTSO() throws IOException, InterruptedException {
168 tso = injector.getInstance(TSOServer.class);
169 tso.startAsync();
170 tso.awaitRunning();
171 TestUtils.waitForSocketListening("localhost", 1235, 100);
172 Thread.currentThread().setName("UnitTest(s) thread");
173 }
174
175 @AfterClass
176 public void cleanupTestCompation() throws Exception {
177 teardownTSO();
178 hbaseCluster.shutdown();
179 }
180
181 private void teardownTSO() throws IOException, InterruptedException {
182 tso.stopAsync();
183 tso.awaitTerminated();
184 TestUtils.waitForSocketNotListening("localhost", 1235, 1000);
185 }
186
187 @BeforeMethod
188 public void setupTestCompactionIndividualTest() throws Exception {
189 randomGenerator = new Random(0xfeedcafeL);
190 tm = spy((AbstractTransactionManager) newTransactionManager());
191 }
192
193 private TransactionManager newTransactionManager() throws Exception {
194 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
195 hbaseOmidClientConf.setConnectionString("localhost:1235");
196 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
197 CommitTable.Client commitTableClient = commitTable.getClient();
198 syncPostCommitter =
199 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
200 return HBaseTransactionManager.builder(hbaseOmidClientConf)
201 .postCommitter(syncPostCommitter)
202 .commitTableClient(commitTableClient)
203 .build();
204 }
205
206
207
208 @Test(timeOut = 60_000)
209 public void testRowsUnalteredWhenCommitTableCannotBeReached() throws Throwable {
210 String TEST_TABLE = "testRowsUnalteredWhenCommitTableCannotBeReachedLL";
211 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
212 TTable txTable = new TTable(connection, TEST_TABLE);
213
214
215
216 HBaseTransaction neverendingTx = (HBaseTransaction) tm.begin();
217 long rowId = randomGenerator.nextLong();
218 Put put = new Put(Bytes.toBytes(rowId));
219 put.addColumn(fam, qual, data);
220 txTable.put(neverendingTx, put);
221 assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
222 new TTableCellGetterAdapter(txTable)),
223 "Cell should be there");
224 assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
225 new TTableCellGetterAdapter(txTable)),
226 "Shadow cell should not be there");
227
228 assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table before flushing");
229 LOG.info("Flushing table {}", TEST_TABLE);
230 admin.flush(TableName.valueOf(TEST_TABLE));
231 assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table after flushing");
232
233
234 LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
235 OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
236 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
237 CommitTable commitTable = injector.getInstance(CommitTable.class);
238 CommitTable.Client commitTableClient = spy(commitTable.getClient());
239 SettableFuture<Long> f = SettableFuture.create();
240 f.setException(new IOException("Unable to read"));
241 doReturn(f).when(commitTableClient).readLowWatermark();
242 omidCompactor.commitTableClient = commitTableClient;
243 LOG.info("Compacting table {}", TEST_TABLE);
244 admin.majorCompact(TableName.valueOf(TEST_TABLE));
245
246 LOG.info("Sleeping for 3 secs");
247 Thread.sleep(3000);
248 LOG.info("Waking up after 3 secs");
249
250
251 assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
252 assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
253 new TTableCellGetterAdapter(txTable)),
254 "Cell should be there");
255 assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
256 new TTableCellGetterAdapter(txTable)),
257 "Shadow cell should not be there");
258 }
259
260
261 @Test(timeOut = 60_000)
262
263 public void testCommitTableInvalidation() throws Exception {
264 String TEST_TABLE = "testCommitTableInvalidationLL";
265 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
266 TTable txTable = new TTable(connection, TEST_TABLE);
267 byte[] rowId = Bytes.toBytes("row");
268
269 HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
270 Put p = new Put(rowId);
271 p.addColumn(fam, qual, Bytes.toBytes("testValue"));
272 txTable.put(tx1, p);
273
274 HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
275 compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
276
277 try {
278
279 Thread.sleep(1000);
280
281 tm.commit(tx1);
282 fail(" Should have been invalidated");
283 } catch (RollbackException e) {
284 e.printStackTrace();
285 }
286 }
287
288
289 private void setCompactorLWM(long lwm, String tableName) throws Exception {
290 OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(tableName)).get(0)
291 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
292 CommitTable commitTable = injector.getInstance(CommitTable.class);
293 CommitTable.Client commitTableClient = spy(commitTable.getClient());
294 SettableFuture<Long> f = SettableFuture.create();
295 f.set(lwm);
296 doReturn(f).when(commitTableClient).readLowWatermark();
297 omidCompactor.commitTableClient = commitTableClient;
298 }
299
300 private void compactEverything(String tableName) throws Exception {
301 compactWithLWM(Long.MAX_VALUE, tableName);
302 }
303
304 private void compactWithLWM(long lwm, String tableName) throws Exception {
305 admin.flush(TableName.valueOf(tableName));
306
307 LOG.info("Regions in table {}: {}", tableName, hbaseCluster.getRegions(Bytes.toBytes(tableName)).size());
308 setCompactorLWM(lwm, tableName);
309 LOG.info("Compacting table {}", tableName);
310 admin.majorCompact(TableName.valueOf(tableName));
311
312 LOG.info("Sleeping for 3 secs");
313 Thread.sleep(3000);
314 LOG.info("Waking up after 3 secs");
315 }
316
317 private static long rowCount(String tableName, byte[] family) throws Throwable {
318 Scan scan = new Scan();
319 scan.addFamily(family);
320 Table table = connection.getTable(TableName.valueOf(tableName));
321 try (ResultScanner scanner = table.getScanner(scan)) {
322 int count = 0;
323 while (scanner.next() != null) {
324 count++;
325 }
326 return count;
327 }
328 }
329 }