1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.lang.reflect.Method;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.CellUtil;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HColumnDescriptor;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.MiniHBaseCluster;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.Admin;
36 import org.apache.hadoop.hbase.client.Connection;
37 import org.apache.hadoop.hbase.client.ConnectionFactory;
38 import org.apache.hadoop.hbase.client.Get;
39 import org.apache.hadoop.hbase.client.HBaseAdmin;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.Table;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.omid.TestUtils;
44 import org.apache.omid.committable.CommitTable;
45 import org.apache.omid.committable.InMemoryCommitTable;
46 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
47 import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
48 import org.apache.omid.tools.hbase.OmidTableManager;
49 import org.apache.omid.tso.TSOMockModule;
50 import org.apache.omid.tso.TSOServer;
51 import org.apache.omid.tso.TSOServerConfig;
52 import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
53 import org.apache.omid.tso.client.OmidClientConfiguration;
54 import org.apache.omid.tso.client.TSOClient;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.testng.ITestContext;
58 import org.testng.annotations.AfterGroups;
59 import org.testng.annotations.AfterMethod;
60 import org.testng.annotations.BeforeGroups;
61 import org.testng.annotations.BeforeMethod;
62
63 import com.google.inject.Guice;
64 import com.google.inject.Injector;
65
66 public abstract class OmidTestBase {
67
68 private static final Logger LOG = LoggerFactory.getLogger(OmidTestBase.class);
69
70 static HBaseTestingUtility hBaseUtils;
71 private static MiniHBaseCluster hbaseCluster;
72 static Configuration hbaseConf;
73 static Connection connection;
74
75 protected static final String TEST_TABLE = "test";
76 protected static final String TEST_FAMILY = "data";
77 static final String TEST_FAMILY2 = "data2";
78
79 private HBaseCommitTableConfig hBaseCommitTableConfig;
80
81 @BeforeMethod(alwaysRun = true)
82 public void beforeClass(Method method) throws Exception {
83 Thread.currentThread().setName("UnitTest-" + method.getName());
84 }
85
86
87 @BeforeGroups(groups = "sharedHBase")
88 public void beforeGroups(ITestContext context) throws Exception {
89
90 TSOServerConfig tsoConfig = new TSOServerConfig();
91 tsoConfig.setPort(1234);
92 tsoConfig.setConflictMapSize(1000);
93 tsoConfig.setWaitStrategy("LOW_CPU");
94 tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
95 Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
96 LOG.info("Starting TSO");
97 TSOServer tso = injector.getInstance(TSOServer.class);
98 hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
99 HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
100 tso.startAsync();
101 tso.awaitRunning();
102 TestUtils.waitForSocketListening("localhost", 1234, 100);
103 LOG.info("Finished loading TSO");
104 context.setAttribute("tso", tso);
105
106 OmidClientConfiguration clientConf = new OmidClientConfiguration();
107 clientConf.setConnectionString("localhost:1234");
108 context.setAttribute("clientConf", clientConf);
109
110 InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
111 context.setAttribute("commitTable", commitTable);
112
113
114 TSOClient client = TSOClient.newInstance(clientConf);
115 context.setAttribute("client", client);
116
117
118
119
120 LOG.info("Creating HBase minicluster");
121 hbaseConf = HBaseConfiguration.create();
122 hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
123 hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
124 hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
125
126 File tempFile = File.createTempFile("OmidTest", "");
127 tempFile.deleteOnExit();
128 hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
129 hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
130 hBaseUtils = new HBaseTestingUtility(hbaseConf);
131 hbaseCluster = hBaseUtils.startMiniCluster(1);
132 connection = ConnectionFactory.createConnection(hbaseConf);
133 hBaseUtils.createTable(TableName.valueOf(hBaseTimestampStorageConfig.getTableName()),
134 new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
135 Integer.MAX_VALUE);
136 createTestTable();
137 createCommitTable();
138
139 LOG.info("HBase minicluster is up");
140 }
141
142 protected void createTestTable() throws IOException {
143 HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
144 HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
145 HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
146 HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2);
147 datafam.setMaxVersions(Integer.MAX_VALUE);
148 datafam2.setMaxVersions(Integer.MAX_VALUE);
149 test_table_desc.addFamily(datafam);
150 test_table_desc.addFamily(datafam2);
151 admin.createTable(test_table_desc);
152 }
153
154 private void createCommitTable() throws IOException {
155 String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"};
156 OmidTableManager omidTableManager = new OmidTableManager(args);
157 omidTableManager.executeActionsOnHBase(hbaseConf);
158 }
159
160
161 private TSOServer getTSO(ITestContext context) {
162 return (TSOServer) context.getAttribute("tso");
163 }
164
165
166 TSOClient getClient(ITestContext context) {
167 return (TSOClient) context.getAttribute("client");
168 }
169
170 InMemoryCommitTable getCommitTable(ITestContext context) {
171 return (InMemoryCommitTable) context.getAttribute("commitTable");
172 }
173
174 protected TransactionManager newTransactionManager(ITestContext context) throws Exception {
175 return newTransactionManager(context, getClient(context));
176 }
177
178 protected TransactionManager newTransactionManager(ITestContext context, PostCommitActions postCommitActions) throws Exception {
179 HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
180 clientConf.setConnectionString("localhost:1234");
181 clientConf.setHBaseConfiguration(hbaseConf);
182 return HBaseTransactionManager.builder(clientConf)
183 .postCommitter(postCommitActions)
184 .commitTableClient(getCommitTable(context).getClient())
185 .commitTableWriter(getCommitTable(context).getWriter())
186 .tsoClient(getClient(context)).build();
187 }
188
189 protected TransactionManager newTransactionManager(ITestContext context, TSOClient tsoClient) throws Exception {
190 HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
191 clientConf.setConnectionString("localhost:1234");
192 clientConf.setHBaseConfiguration(hbaseConf);
193 return HBaseTransactionManager.builder(clientConf)
194 .commitTableClient(getCommitTable(context).getClient())
195 .commitTableWriter(getCommitTable(context).getWriter())
196 .tsoClient(tsoClient).build();
197 }
198
199 protected TransactionManager newTransactionManager(ITestContext context, CommitTable.Client commitTableClient)
200 throws Exception {
201 HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
202 clientConf.setConnectionString("localhost:1234");
203 clientConf.setHBaseConfiguration(hbaseConf);
204 return HBaseTransactionManager.builder(clientConf)
205 .commitTableClient(commitTableClient)
206 .commitTableWriter(getCommitTable(context).getWriter())
207 .tsoClient(getClient(context)).build();
208 }
209
210 @AfterGroups(groups = "sharedHBase")
211 public void afterGroups(ITestContext context) throws Exception {
212 LOG.info("Tearing down OmidTestBase...");
213 if (hbaseCluster != null) {
214 hBaseUtils.shutdownMiniCluster();
215 }
216
217 getClient(context).close().get();
218 getTSO(context).stopAsync();
219 getTSO(context).awaitTerminated();
220 TestUtils.waitForSocketNotListening("localhost", 1234, 1000);
221 }
222
223 @AfterMethod(groups = "sharedHBase", timeOut = 60_000)
224 public void afterMethod() {
225 try {
226 LOG.info("tearing Down");
227 Admin admin = hBaseUtils.getHBaseAdmin();
228 deleteTable(admin, TableName.valueOf(TEST_TABLE));
229 createTestTable();
230 if (hBaseCommitTableConfig != null) {
231 deleteTable(admin, TableName.valueOf(hBaseCommitTableConfig.getTableName()));
232 }
233 createCommitTable();
234 } catch (Exception e) {
235 LOG.error("Error tearing down", e);
236 }
237 }
238
239 void deleteTable(Admin admin, TableName tableName) throws IOException {
240 if (admin.tableExists(tableName)) {
241 if (admin.isTableDisabled(tableName)) {
242 admin.deleteTable(tableName);
243 } else {
244 admin.disableTable(tableName);
245 admin.deleteTable(tableName);
246 }
247 }
248 }
249
250 static boolean verifyValue(Table table, byte[] row,
251 byte[] fam, byte[] col, byte[] value) {
252
253 try {
254 Get g = new Get(row).setMaxVersions(1);
255 Result r = table.get(g);
256 Cell cell = r.getColumnLatestCell(fam, col);
257
258 if (LOG.isTraceEnabled()) {
259 LOG.trace("Value for " + table.getName().getNameAsString() + ":"
260 + Bytes.toString(row) + ":" + Bytes.toString(fam)
261 + Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(cell))
262 + " (" + Bytes.toString(value) + " expected)");
263 }
264
265 return Bytes.equals(CellUtil.cloneValue(cell), value);
266 } catch (IOException e) {
267 LOG.error("Error reading row " + table.getName().getNameAsString() + ":"
268 + Bytes.toString(row) + ":" + Bytes.toString(fam)
269 + Bytes.toString(col), e);
270 return false;
271 }
272 }
273 }