1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20
21 import static com.google.common.base.Charsets.UTF_8;
22 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
23 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.DEFAULT_COMMIT_TABLE_CF_NAME;
24 import static org.mockito.Mockito.spy;
25 import static org.testng.Assert.assertFalse;
26 import static org.testng.Assert.assertTrue;
27 import com.google.inject.Guice;
28 import com.google.inject.Injector;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.Get;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.util.Bytes;
35
36 import org.apache.omid.committable.hbase.KeyGenerator;
37 import org.apache.omid.committable.hbase.KeyGeneratorImplementations;
38
39 import org.apache.omid.tso.client.OmidClientConfiguration;
40 import org.apache.omid.tso.client.TSOClient;
41
42 import org.testng.ITestContext;
43 import org.testng.annotations.BeforeClass;
44 import org.testng.annotations.Test;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import java.io.File;
49 import java.io.IOException;
50
51
52 import org.apache.hadoop.conf.Configuration;
53
54 import org.apache.hadoop.hbase.HBaseConfiguration;
55 import org.apache.hadoop.hbase.HBaseTestingUtility;
56 import org.apache.hadoop.hbase.HColumnDescriptor;
57 import org.apache.hadoop.hbase.HTableDescriptor;
58 import org.apache.hadoop.hbase.MiniHBaseCluster;
59
60 import org.apache.hadoop.hbase.client.Connection;
61 import org.apache.hadoop.hbase.client.ConnectionFactory;
62
63 import org.apache.hadoop.hbase.client.HBaseAdmin;
64
65 import org.apache.omid.TestUtils;
66
67
68 import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
69 import org.apache.omid.tools.hbase.OmidTableManager;
70 import org.apache.omid.tso.TSOMockModule;
71 import org.apache.omid.tso.TSOServer;
72 import org.apache.omid.tso.TSOServerConfig;
73
74
75 public class TestOmidLLRaces {
76
77 static HBaseTestingUtility hBaseUtils;
78 private static MiniHBaseCluster hbaseCluster;
79 static Configuration hbaseConf;
80 static Connection connection;
81
82 private static final String TEST_FAMILY = "data";
83 static final String TEST_FAMILY2 = "data2";
84 private static final String TEST_TABLE = "test";
85 private static final byte[] row1 = Bytes.toBytes("test-is-committed1");
86 private static final byte[] row2 = Bytes.toBytes("test-is-committed2");
87 private static final byte[] family = Bytes.toBytes("data");
88 private static final byte[] qualifier = Bytes.toBytes("testdata");
89 private static final byte[] data1 = Bytes.toBytes("testWrite-1");
90
91 private static final Logger LOG = LoggerFactory.getLogger(TestOmidLLRaces.class);
92 private TSOClient client;
93
94 @BeforeClass
95 public void setup() throws Exception {
96
97 TSOServerConfig tsoConfig = new TSOServerConfig();
98 tsoConfig.setPort(1234);
99 tsoConfig.setConflictMapSize(1000);
100 tsoConfig.setLowLatency(true);
101 tsoConfig.setWaitStrategy("LOW_CPU");
102 Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
103 LOG.info("Starting TSO");
104 TSOServer tso = injector.getInstance(TSOServer.class);
105 HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
106 tso.startAsync();
107 tso.awaitRunning();
108 TestUtils.waitForSocketListening("localhost", 1234, 100);
109 LOG.info("Finished loading TSO");
110
111 OmidClientConfiguration clientConf = new OmidClientConfiguration();
112 clientConf.setConnectionString("localhost:1234");
113
114
115 client = TSOClient.newInstance(clientConf);
116
117
118
119
120 LOG.info("Creating HBase minicluster");
121 hbaseConf = HBaseConfiguration.create();
122 hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
123 hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
124 hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
125
126 File tempFile = File.createTempFile("OmidTest", "");
127 tempFile.deleteOnExit();
128 hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
129 hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
130 hBaseUtils = new HBaseTestingUtility(hbaseConf);
131 hbaseCluster = hBaseUtils.startMiniCluster(1);
132 connection = ConnectionFactory.createConnection(hbaseConf);
133 hBaseUtils.createTable(TableName.valueOf(hBaseTimestampStorageConfig.getTableName()),
134 new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
135 Integer.MAX_VALUE);
136 createTestTable();
137 createCommitTable();
138
139 LOG.info("HBase minicluster is up");
140 }
141
142
143 private void createCommitTable() throws IOException {
144 String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"};
145 OmidTableManager omidTableManager = new OmidTableManager(args);
146 omidTableManager.executeActionsOnHBase(hbaseConf);
147 }
148
149 private void createTestTable() throws IOException {
150 HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
151 HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
152 HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
153 HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2);
154 datafam.setMaxVersions(Integer.MAX_VALUE);
155 datafam2.setMaxVersions(Integer.MAX_VALUE);
156 test_table_desc.addFamily(datafam);
157 test_table_desc.addFamily(datafam2);
158 admin.createTable(test_table_desc);
159 }
160
161 protected TransactionManager newTransactionManagerHBaseCommitTable(TSOClient tsoClient) throws Exception {
162 HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
163 clientConf.setConnectionString("localhost:1234");
164 clientConf.setHBaseConfiguration(hbaseConf);
165 return HBaseTransactionManager.builder(clientConf)
166 .tsoClient(tsoClient).build();
167 }
168
169
170 @Test(timeOut = 30_000)
171 public void testIsCommitted() throws Exception {
172 AbstractTransactionManager tm = (AbstractTransactionManager)newTransactionManagerHBaseCommitTable(client);
173
174 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
175 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
176 tm.getCommitTableClient());
177 TTable table = spy(new TTable(htable, snapshotFilter, false));
178
179 HBaseTransaction t1 = (HBaseTransaction) tm.begin();
180
181 Put put = new Put(row1);
182 put.addColumn(family, qualifier, data1);
183 table.put(t1, put);
184 tm.commit(t1);
185
186 HBaseTransaction t2 = (HBaseTransaction) tm.begin();
187 put = new Put(row2);
188 put.addColumn(family, qualifier, data1);
189 table.put(t2, put);
190 table.flushCommits();
191
192 HBaseTransaction t3 = (HBaseTransaction) tm.begin();
193 put = new Put(row2);
194 put.addColumn(family, qualifier, data1);
195 table.put(t3, put);
196 tm.commit(t3);
197
198 HBaseCellId hBaseCellId1 = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
199 HBaseCellId hBaseCellId2 = new HBaseCellId(table, row2, family, qualifier, t2.getStartTimestamp());
200 HBaseCellId hBaseCellId3 = new HBaseCellId(table, row2, family, qualifier, t3.getStartTimestamp());
201
202 assertTrue(snapshotFilter.isCommitted(hBaseCellId1, 0, false), "row1 should be committed");
203 assertFalse(snapshotFilter.isCommitted(hBaseCellId2, 0, false), "row2 should not be committed for kv2");
204 assertTrue(snapshotFilter.isCommitted(hBaseCellId3, 0, false), "row2 should be committed for kv3");
205 assertTrue(tm.isLowLatency());
206 }
207
208
209 @Test(timeOut = 30_000)
210 public void testInvalidation(ITestContext context) throws Exception {
211 AbstractTransactionManager tm = (AbstractTransactionManager)newTransactionManagerHBaseCommitTable(client);
212
213 Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
214 SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
215 tm.getCommitTableClient());
216 TTable table = spy(new TTable(htable, snapshotFilter, false));
217
218 HBaseTransaction t1 = (HBaseTransaction) tm.begin();
219 Put put = new Put(row1);
220 put.addColumn(family, qualifier, data1);
221 table.put(t1, put);
222
223 HBaseTransaction t2 = (HBaseTransaction) tm.begin();
224 Get get = new Get(row1);
225 get.addColumn(family, qualifier);
226 table.get(t2,get);
227
228
229 Table commitTable = connection.getTable(TableName.valueOf("OMID_COMMIT_TABLE"));
230 KeyGenerator keygen = KeyGeneratorImplementations.defaultKeyGenerator();
231 byte[] row = keygen.startTimestampToKey(t1.getStartTimestamp());
232 Get getInvalidation = new Get(row);
233 getInvalidation.addColumn(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME),"IT".getBytes(UTF_8));
234 Result res = commitTable.get(getInvalidation);
235 int val = Bytes.toInt(res.getValue(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME), "IT".getBytes(UTF_8)));
236 assertTrue(val == 1);
237
238 boolean gotInvalidated = false;
239 try {
240 tm.commit(t1);
241 } catch (RollbackException e) {
242 gotInvalidated = true;
243 }
244 assertTrue(gotInvalidated);
245 tm.commit(t2);
246 Thread.sleep(1000);
247 res = commitTable.get(getInvalidation);
248 assertTrue(res.isEmpty());
249 assertTrue(tm.isLowLatency());
250 }
251 }