1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.mockito.Mockito.spy;
21 import static org.testng.Assert.assertTrue;
22
23 import java.io.IOException;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Coprocessor;
27 import org.apache.hadoop.hbase.HBaseTestingUtility;
28 import org.apache.hadoop.hbase.HColumnDescriptor;
29 import org.apache.hadoop.hbase.HTableDescriptor;
30 import org.apache.hadoop.hbase.MiniHBaseCluster;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.Admin;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.Get;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Result;
38 import org.apache.hadoop.hbase.client.ResultScanner;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.omid.TestUtils;
42 import org.apache.omid.committable.CommitTable;
43 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
44 import org.apache.omid.metrics.NullMetricsProvider;
45 import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
46 import org.apache.omid.tso.TSOServer;
47 import org.apache.omid.tso.TSOServerConfig;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.testng.annotations.AfterClass;
51 import org.testng.annotations.BeforeClass;
52 import org.testng.annotations.BeforeMethod;
53 import org.testng.annotations.Test;
54
55 import com.google.inject.Guice;
56 import com.google.inject.Injector;
57
58 public class TestSnapshotFilterLL {
59
60 private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFilterLL.class);
61
62 private static final String TEST_FAMILY = "test-fam";
63
64 private static final int MAX_VERSIONS = 3;
65
66 private AbstractTransactionManager tm;
67
68 private Injector injector;
69
70 private Admin admin;
71 private Configuration hbaseConf;
72 private HBaseTestingUtility hbaseTestUtil;
73 private MiniHBaseCluster hbaseCluster;
74
75 private TSOServer tso;
76
77 private CommitTable commitTable;
78 private PostCommitActions syncPostCommitter;
79 private Connection connection;
80
81 @BeforeClass
82 public void setupTestSnapshotFilter() throws Exception {
83 TSOServerConfig tsoConfig = new TSOServerConfig();
84 tsoConfig.setPort(5678);
85 tsoConfig.setConflictMapSize(1);
86 tsoConfig.setWaitStrategy("LOW_CPU");
87 tsoConfig.setLowLatency(true);
88 injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig));
89 hbaseConf = injector.getInstance(Configuration.class);
90 hbaseConf.setBoolean("omid.server.side.filter", true);
91 hbaseConf.setInt("hbase.hconnection.threads.core", 5);
92 hbaseConf.setInt("hbase.hconnection.threads.max", 10);
93
94 hbaseConf.setInt("hbase.regionserver.handler.count", 10);
95
96
97 hbaseConf.setInt("hbase.master.port", 0);
98 hbaseConf.setInt("hbase.master.info.port", 0);
99 hbaseConf.setInt("hbase.regionserver.port", 0);
100 hbaseConf.setInt("hbase.regionserver.info.port", 0);
101
102
103 HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
104 HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
105
106 setupHBase();
107 connection = ConnectionFactory.createConnection(hbaseConf);
108 admin = connection.getAdmin();
109 createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
110 setupTSO();
111
112 commitTable = injector.getInstance(CommitTable.class);
113 }
114
115 private void setupHBase() throws Exception {
116 LOG.info("--------------------------------------------------------------------------------------------------");
117 LOG.info("Setting up HBase");
118 LOG.info("--------------------------------------------------------------------------------------------------");
119 hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
120 LOG.info("--------------------------------------------------------------------------------------------------");
121 LOG.info("Creating HBase MiniCluster");
122 LOG.info("--------------------------------------------------------------------------------------------------");
123 hbaseCluster = hbaseTestUtil.startMiniCluster(1);
124 }
125
126 private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
127 HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
128 createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
129
130 createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
131 }
132
133 private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
134 if (!admin.tableExists(TableName.valueOf(tableName))) {
135 LOG.info("Creating {} table...", tableName);
136 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
137
138 for (byte[] family : families) {
139 HColumnDescriptor datafam = new HColumnDescriptor(family);
140 datafam.setMaxVersions(MAX_VERSIONS);
141 desc.addFamily(datafam);
142 }
143
144 int priority = Coprocessor.PRIORITY_HIGHEST;
145
146 desc.addCoprocessor(OmidSnapshotFilter.class.getName(),null,++priority,null);
147 desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,++priority,null);
148
149 admin.createTable(desc);
150 try {
151 hbaseTestUtil.waitTableAvailable(TableName.valueOf(tableName),5000);
152 } catch (InterruptedException e) {
153 e.printStackTrace();
154 }
155 }
156
157 }
158
159 private void setupTSO() throws IOException, InterruptedException {
160 tso = injector.getInstance(TSOServer.class);
161 tso.startAsync();
162 tso.awaitRunning();
163 TestUtils.waitForSocketListening("localhost", 5678, 100);
164 Thread.currentThread().setName("UnitTest(s) thread");
165 }
166
167 @AfterClass
168 public void cleanupTestSnapshotFilter() throws Exception {
169 teardownTSO();
170 hbaseCluster.shutdown();
171 }
172
173 private void teardownTSO() throws IOException, InterruptedException {
174 tso.stopAsync();
175 tso.awaitTerminated();
176 TestUtils.waitForSocketNotListening("localhost", 5678, 1000);
177 }
178
179 @BeforeMethod
180 public void setupTestSnapshotFilterIndividualTest() throws Exception {
181 tm = spy((AbstractTransactionManager) newTransactionManager());
182 }
183
184 private TransactionManager newTransactionManager() throws Exception {
185 HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
186 hbaseOmidClientConf.setConnectionString("localhost:5678");
187 hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
188 CommitTable.Client commitTableClient = commitTable.getClient();
189 syncPostCommitter =
190 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
191 return HBaseTransactionManager.builder(hbaseOmidClientConf)
192 .postCommitter(syncPostCommitter)
193 .commitTableClient(commitTableClient)
194 .build();
195 }
196
197
198 @Test(timeOut = 60_000)
199 public void testInvalidate() throws Throwable {
200 byte[] rowName1 = Bytes.toBytes("row1");
201 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
202 byte[] colName1 = Bytes.toBytes("col1");
203 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
204
205 String TEST_TABLE = "testGetFirstResult";
206 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
207 TTable tt = new TTable(connection, TEST_TABLE);
208
209 Transaction tx1 = tm.begin();
210
211 Put row1 = new Put(rowName1);
212 row1.addColumn(famName1, colName1, dataValue1);
213 tt.put(tx1, row1);
214
215
216 Transaction tx2 = tm.begin();
217
218 Get get = new Get(rowName1);
219 Result result = tt.get(tx2, get);
220
221 assertTrue(result.isEmpty(), "Result should not be empty!");
222
223
224 boolean gotInvalidated = false;
225 try {
226 tm.commit(tx1);
227 } catch (RollbackException e) {
228 gotInvalidated = true;
229 }
230 assertTrue(gotInvalidated);
231 assertTrue(tm.isLowLatency());
232 }
233
234 @Test(timeOut = 60_000)
235 public void testInvalidateByScan() throws Throwable {
236 byte[] rowName1 = Bytes.toBytes("row1");
237 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
238 byte[] colName1 = Bytes.toBytes("col1");
239 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
240
241 String TEST_TABLE = "testGetFirstResult";
242 createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
243 TTable tt = new TTable(connection, TEST_TABLE);
244
245 Transaction tx1 = tm.begin();
246
247 Put row1 = new Put(rowName1);
248 row1.addColumn(famName1, colName1, dataValue1);
249 tt.put(tx1, row1);
250
251
252 Transaction tx2 = tm.begin();
253
254 ResultScanner iterableRS = tt.getScanner(tx2, new Scan().setStartRow(rowName1).setStopRow(rowName1));
255 assertTrue(iterableRS.next() == null);
256
257 tm.commit(tx2);
258
259 boolean gotInvalidated = false;
260 try {
261 tm.commit(tx1);
262 } catch (RollbackException e) {
263 gotInvalidated = true;
264 }
265 assertTrue(gotInvalidated);
266 assertTrue(tm.isLowLatency());
267 }
268
269 }