1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso.client;
19
20 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
22 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
23 import com.google.inject.Guice;
24 import com.google.inject.Injector;
25 import com.google.inject.Module;
26
27 import org.apache.omid.TestUtils;
28 import org.apache.omid.committable.CommitTable;
29 import org.apache.omid.tso.LowWatermarkWriter;
30 import org.apache.omid.tso.TSOMockModule;
31 import org.apache.omid.tso.TSOServer;
32 import org.apache.omid.tso.TSOServerConfig;
33 import org.apache.omid.tso.util.DummyCellIdImpl;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.testng.Assert;
37 import org.testng.annotations.AfterClass;
38 import org.testng.annotations.BeforeClass;
39 import org.testng.annotations.Test;
40
41 import java.util.HashSet;
42 import java.util.Set;
43 import java.util.concurrent.ExecutionException;
44
45 import static org.mockito.Matchers.any;
46 import static org.mockito.Mockito.*;
47 import static org.testng.Assert.assertEquals;
48 import static org.testng.Assert.assertFalse;
49 import static org.testng.Assert.assertNotNull;
50 import static org.testng.Assert.assertTrue;
51
52 public class TestIntegrationOfTSOClientServerBasicFunctionality {
53
54 private static final Logger LOG = LoggerFactory.getLogger(TestIntegrationOfTSOClientServerBasicFunctionality.class);
55
56 private static final String TSO_SERVER_HOST = "localhost";
57 private int tsoServerPortForTest;
58
59
60 private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
61 private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
62
63
64 private TSOServer tsoServer;
65 private TSOClient tsoClient;
66 private TSOClient justAnotherTSOClient;
67 private CommitTable.Client commitTableClient;
68 private LowWatermarkWriter lowWatermarkWriter;
69
70 @BeforeClass
71 public void setup() throws Exception {
72
73 tsoServerPortForTest = TestUtils.getFreeLocalPort();
74
75 TSOServerConfig tsoConfig = new TSOServerConfig();
76 tsoConfig.setConflictMapSize(1000);
77 tsoConfig.setPort(tsoServerPortForTest);
78 Module tsoServerMockModule = new TSOMockModule(tsoConfig);
79 Injector injector = Guice.createInjector(tsoServerMockModule);
80
81 lowWatermarkWriter = injector.getInstance(LowWatermarkWriter.class);
82
83 CommitTable commitTable = injector.getInstance(CommitTable.class);
84 commitTableClient = commitTable.getClient();
85
86 LOG.info("==================================================================================================");
87 LOG.info("======================================= Init TSO Server ==========================================");
88 LOG.info("==================================================================================================");
89
90 tsoServer = injector.getInstance(TSOServer.class);
91 tsoServer.startAsync();
92 tsoServer.awaitRunning();
93 TestUtils.waitForSocketListening(TSO_SERVER_HOST, tsoServerPortForTest, 100);
94
95 LOG.info("==================================================================================================");
96 LOG.info("===================================== TSO Server Initialized =====================================");
97 LOG.info("==================================================================================================");
98
99 LOG.info("==================================================================================================");
100 LOG.info("======================================= Setup TSO Clients ========================================");
101 LOG.info("==================================================================================================");
102
103
104 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
105 tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + tsoServerPortForTest);
106
107 tsoClient = TSOClient.newInstance(tsoClientConf);
108 justAnotherTSOClient = TSOClient.newInstance(tsoClientConf);
109
110 LOG.info("==================================================================================================");
111 LOG.info("===================================== TSO Clients Initialized ====================================");
112 LOG.info("==================================================================================================");
113
114 Thread.currentThread().setName("Test Thread");
115
116 }
117
118 @AfterClass
119 public void tearDown() throws Exception {
120
121 tsoClient.close().get();
122
123 tsoServer.stopAsync();
124 tsoServer.awaitTerminated();
125 tsoServer = null;
126 TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, tsoServerPortForTest, 1000);
127
128 }
129
130 @Test(timeOut = 30_000)
131 public void testTimestampsOrderingGrowMonotonically() throws Exception {
132 long referenceTimestamp;
133 long startTsTx1 = tsoClient.getNewStartTimestamp().get();
134 referenceTimestamp = startTsTx1;
135
136 long startTsTx2 = tsoClient.getNewStartTimestamp().get();
137 referenceTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
138 assertTrue(startTsTx2 >= referenceTimestamp, "Should grow monotonically");
139 assertTrue(startTsTx2 > startTsTx1, "Two timestamps obtained consecutively should grow");
140
141 long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1)).get();
142 referenceTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
143 assertTrue(commitTsTx2 >= referenceTimestamp, "Should grow monotonically");
144
145 long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c2)).get();
146 referenceTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
147 assertTrue(commitTsTx1 >= referenceTimestamp, "Should grow monotonically");
148
149 long startTsTx3 = tsoClient.getNewStartTimestamp().get();
150 referenceTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
151 assertTrue(startTsTx3 >= referenceTimestamp, "Should grow monotonically");
152 }
153
154 @Test(timeOut = 30_000)
155 public void testSimpleTransactionWithNoWriteSetCanCommit() throws Exception {
156 long startTsTx1 = tsoClient.getNewStartTimestamp().get();
157 long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.<CellId>newHashSet()).get();
158 assertTrue(commitTsTx1 > startTsTx1);
159 }
160
161 @Test(timeOut = 30_000)
162 public void testTransactionWithMassiveWriteSetCanCommit() throws Exception {
163 long startTs = tsoClient.getNewStartTimestamp().get();
164
165 Set<CellId> cells = new HashSet<>();
166 for (int i = 0; i < 1_000_000; i++) {
167 cells.add(new DummyCellIdImpl(i));
168 }
169
170 long commitTs = tsoClient.commit(startTs, cells).get();
171 assertTrue(commitTs > startTs, "Commit TS should be higher than Start TS");
172 }
173
174 @Test(timeOut = 30_000)
175 public void testMultipleSerialCommitsDoNotConflict() throws Exception {
176 long startTsTx1 = tsoClient.getNewStartTimestamp().get();
177 long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
178 assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be greater than Start TS");
179
180 long startTsTx2 = tsoClient.getNewStartTimestamp().get();
181 assertTrue(startTsTx2 > commitTsTx1, "TS should grow monotonically");
182
183 long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
184 assertTrue(commitTsTx2 > startTsTx2, "Commit TS must be greater than Start TS");
185
186 long startTsTx3 = tsoClient.getNewStartTimestamp().get();
187 long commitTsTx3 = tsoClient.commit(startTsTx3, Sets.newHashSet(c2)).get();
188 assertTrue(commitTsTx3 > startTsTx3, "Commit TS must be greater than Start TS");
189 }
190
191 @Test(timeOut = 30_000)
192 public void testCommitWritesToCommitTable() throws Exception {
193
194 long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
195 long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
196 assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
197
198 if (!tsoClient.isLowLatency())
199 assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
200 "Commit TS for Tx1 shouldn't appear in Commit Table");
201
202 long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
203 assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
204
205 if (!tsoClient.isLowLatency()) {
206 Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
207 assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
208 assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
209 "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
210 assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
211 } else {
212 assertTrue(commitTsForTx1 > startTsForTx2, "Commit TS should be higher than tx's Start TS");
213 }
214
215
216 }
217
218 @Test(timeOut = 30_000)
219 public void testTwoConcurrentTxWithOverlappingWritesetsHaveConflicts() throws Exception {
220 long startTsTx1 = tsoClient.getNewStartTimestamp().get();
221 long startTsTx2 = tsoClient.getNewStartTimestamp().get();
222 assertTrue(startTsTx2 > startTsTx1, "Second TX should have higher TS");
223
224 long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
225 assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be higher than Start TS for the same tx");
226
227 try {
228 tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
229 Assert.fail("Second TX should fail on commit");
230 } catch (ExecutionException ee) {
231 assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
232 }
233 }
234
235 @Test(timeOut = 30_000)
236 public void testTransactionStartedBeforeFenceAborts() throws Exception {
237
238 long startTsTx1 = tsoClient.getNewStartTimestamp().get();
239
240 long fenceID = tsoClient.getFence(c1.getTableId()).get();
241
242 assertTrue(fenceID > startTsTx1, "Fence ID should be higher thank Tx1ID");
243
244 try {
245 tsoClient.commit(startTsTx1, Sets.newHashSet(c1, c2)).get();
246 Assert.fail("TX should fail on commit");
247 } catch (ExecutionException ee) {
248 assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
249 }
250 }
251
252 @Test(timeOut = 30_000)
253 public void testTransactionStartedBeforeNonOverlapFenceCommits() throws Exception {
254
255 long startTsTx1 = tsoClient.getNewStartTimestamp().get();
256
257 tsoClient.getFence(7).get();
258
259 try {
260 tsoClient.commit(startTsTx1, Sets.newHashSet(c1, c2)).get();
261 } catch (ExecutionException ee) {
262 Assert.fail("TX should successfully commit"); }
263 }
264
265 @Test(timeOut = 30_000)
266 public void testTransactionStartedAfterFenceCommits() throws Exception {
267
268 tsoClient.getFence(c1.getTableId()).get();
269
270 long startTsTx1 = tsoClient.getNewStartTimestamp().get();
271
272 try {
273 tsoClient.commit(startTsTx1, Sets.newHashSet(c1, c2)).get();
274 } catch (ExecutionException ee) {
275 Assert.fail("TX should successfully commit");
276 }
277 }
278
279 @Test(timeOut = 30_000)
280 public void testConflictsAndMonotonicallyTimestampGrowthWithTwoDifferentTSOClients() throws Exception {
281 long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
282 long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
283 long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
284
285 Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
286 try {
287 tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
288 Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
289 } catch (ExecutionException ee) {
290 assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
291 }
292 long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
293
294 assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
295 if (!tsoClient.isLowLatency())
296 commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
297 assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
298 assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
299 }
300
301 @Test(timeOut = 30_000)
302 public void testLowWaterMarksgetAdvanced() throws ExecutionException, InterruptedException {
303
304 long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
305 HashSet<DummyCellIdImpl> ws = new HashSet<>();
306 for (int i=0; i< 1000*32; ++i) {
307 ws.add(new DummyCellIdImpl(i));
308 }
309
310 Long beforeCommitLWM = commitTableClient.readLowWatermark().get();
311
312 Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, ws).get();
313
314 Thread.sleep(300);
315
316 Long afterCommitLWM = commitTableClient.readLowWatermark().get();
317
318 assert(afterCommitLWM > beforeCommitLWM);
319 }
320
321 }