View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
21  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
22  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
23  import com.google.inject.Guice;
24  import com.google.inject.Injector;
25  import com.google.inject.Module;
26  
27  import org.apache.omid.TestUtils;
28  import org.apache.omid.committable.CommitTable;
29  import org.apache.omid.tso.LowWatermarkWriter;
30  import org.apache.omid.tso.TSOMockModule;
31  import org.apache.omid.tso.TSOServer;
32  import org.apache.omid.tso.TSOServerConfig;
33  import org.apache.omid.tso.util.DummyCellIdImpl;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.testng.Assert;
37  import org.testng.annotations.AfterClass;
38  import org.testng.annotations.BeforeClass;
39  import org.testng.annotations.Test;
40  
41  import java.util.HashSet;
42  import java.util.Set;
43  import java.util.concurrent.ExecutionException;
44  
45  import static org.mockito.Matchers.any;
46  import static org.mockito.Mockito.*;
47  import static org.testng.Assert.assertEquals;
48  import static org.testng.Assert.assertFalse;
49  import static org.testng.Assert.assertNotNull;
50  import static org.testng.Assert.assertTrue;
51  
52  public class TestIntegrationOfTSOClientServerBasicFunctionality {
53  
54      private static final Logger LOG = LoggerFactory.getLogger(TestIntegrationOfTSOClientServerBasicFunctionality.class);
55  
56      private static final String TSO_SERVER_HOST = "localhost";
57      private int tsoServerPortForTest;
58  
59      // Cells for tests
60      private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
61      private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
62  
63      // Required infrastructure for TSO tsoClient-server integration testing
64      private TSOServer tsoServer;
65      private TSOClient tsoClient;
66      private TSOClient justAnotherTSOClient;
67      private CommitTable.Client commitTableClient;
68      private LowWatermarkWriter lowWatermarkWriter;
69  
70      @BeforeClass
71      public void setup() throws Exception {
72  
73          tsoServerPortForTest = TestUtils.getFreeLocalPort();
74  
75          TSOServerConfig tsoConfig = new TSOServerConfig();
76          tsoConfig.setConflictMapSize(1000);
77          tsoConfig.setPort(tsoServerPortForTest);
78          Module tsoServerMockModule = new TSOMockModule(tsoConfig);
79          Injector injector = Guice.createInjector(tsoServerMockModule);
80  
81          lowWatermarkWriter = injector.getInstance(LowWatermarkWriter.class);
82  
83          CommitTable commitTable = injector.getInstance(CommitTable.class);
84          commitTableClient = commitTable.getClient();
85  
86          LOG.info("==================================================================================================");
87          LOG.info("======================================= Init TSO Server ==========================================");
88          LOG.info("==================================================================================================");
89  
90          tsoServer = injector.getInstance(TSOServer.class);
91          tsoServer.startAsync();
92          tsoServer.awaitRunning();
93          TestUtils.waitForSocketListening(TSO_SERVER_HOST, tsoServerPortForTest, 100);
94  
95          LOG.info("==================================================================================================");
96          LOG.info("===================================== TSO Server Initialized =====================================");
97          LOG.info("==================================================================================================");
98  
99          LOG.info("==================================================================================================");
100         LOG.info("======================================= Setup TSO Clients ========================================");
101         LOG.info("==================================================================================================");
102 
103         // Configure direct connection to the server
104         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
105         tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + tsoServerPortForTest);
106 
107         tsoClient = TSOClient.newInstance(tsoClientConf);
108         justAnotherTSOClient = TSOClient.newInstance(tsoClientConf);
109 
110         LOG.info("==================================================================================================");
111         LOG.info("===================================== TSO Clients Initialized ====================================");
112         LOG.info("==================================================================================================");
113 
114         Thread.currentThread().setName("Test Thread");
115 
116     }
117 
118     @AfterClass
119     public void tearDown() throws Exception {
120 
121         tsoClient.close().get();
122 
123         tsoServer.stopAsync();
124         tsoServer.awaitTerminated();
125         tsoServer = null;
126         TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, tsoServerPortForTest, 1000);
127 
128     }
129 
130     @Test(timeOut = 30_000)
131     public void testTimestampsOrderingGrowMonotonically() throws Exception {
132         long referenceTimestamp;
133         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
134         referenceTimestamp = startTsTx1;
135 
136         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
137         referenceTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
138         assertTrue(startTsTx2 >= referenceTimestamp, "Should grow monotonically");
139         assertTrue(startTsTx2 > startTsTx1, "Two timestamps obtained consecutively should grow");
140 
141         long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1)).get();
142         referenceTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
143         assertTrue(commitTsTx2 >= referenceTimestamp, "Should grow monotonically");
144 
145         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c2)).get();
146         referenceTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
147         assertTrue(commitTsTx1 >= referenceTimestamp, "Should grow monotonically");
148 
149         long startTsTx3 = tsoClient.getNewStartTimestamp().get();
150         referenceTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
151         assertTrue(startTsTx3 >= referenceTimestamp, "Should grow monotonically");
152     }
153 
154     @Test(timeOut = 30_000)
155     public void testSimpleTransactionWithNoWriteSetCanCommit() throws Exception {
156         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
157         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.<CellId>newHashSet()).get();
158         assertTrue(commitTsTx1 > startTsTx1);
159     }
160 
161     @Test(timeOut = 30_000)
162     public void testTransactionWithMassiveWriteSetCanCommit() throws Exception {
163         long startTs = tsoClient.getNewStartTimestamp().get();
164 
165         Set<CellId> cells = new HashSet<>();
166         for (int i = 0; i < 1_000_000; i++) {
167             cells.add(new DummyCellIdImpl(i));
168         }
169 
170         long commitTs = tsoClient.commit(startTs, cells).get();
171         assertTrue(commitTs > startTs, "Commit TS should be higher than Start TS");
172     }
173 
174     @Test(timeOut = 30_000)
175     public void testMultipleSerialCommitsDoNotConflict() throws Exception {
176         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
177         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
178         assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be greater than Start TS");
179 
180         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
181         assertTrue(startTsTx2 > commitTsTx1, "TS should grow monotonically");
182 
183         long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
184         assertTrue(commitTsTx2 > startTsTx2, "Commit TS must be greater than Start TS");
185 
186         long startTsTx3 = tsoClient.getNewStartTimestamp().get();
187         long commitTsTx3 = tsoClient.commit(startTsTx3, Sets.newHashSet(c2)).get();
188         assertTrue(commitTsTx3 > startTsTx3, "Commit TS must be greater than Start TS");
189     }
190 
191     @Test(timeOut = 30_000)
192     public void testCommitWritesToCommitTable() throws Exception {
193 
194         long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
195         long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
196         assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
197 
198         if (!tsoClient.isLowLatency())
199             assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
200                     "Commit TS for Tx1 shouldn't appear in Commit Table");
201 
202         long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
203         assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
204 
205         if (!tsoClient.isLowLatency()) {
206             Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
207             assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
208             assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
209                     "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
210             assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
211         } else {
212             assertTrue(commitTsForTx1 > startTsForTx2, "Commit TS should be higher than tx's Start TS");
213         }
214 
215 
216     }
217 
218     @Test(timeOut = 30_000)
219     public void testTwoConcurrentTxWithOverlappingWritesetsHaveConflicts() throws Exception {
220         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
221         long startTsTx2 = tsoClient.getNewStartTimestamp().get();
222         assertTrue(startTsTx2 > startTsTx1, "Second TX should have higher TS");
223 
224         long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get();
225         assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be higher than Start TS for the same tx");
226 
227         try {
228             tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get();
229             Assert.fail("Second TX should fail on commit");
230         } catch (ExecutionException ee) {
231             assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
232         }
233     }
234 
235     @Test(timeOut = 30_000)
236     public void testTransactionStartedBeforeFenceAborts() throws Exception {
237 
238         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
239 
240         long fenceID = tsoClient.getFence(c1.getTableId()).get();
241 
242         assertTrue(fenceID > startTsTx1, "Fence ID should be higher thank Tx1ID");
243 
244         try {
245             tsoClient.commit(startTsTx1, Sets.newHashSet(c1, c2)).get();
246             Assert.fail("TX should fail on commit");
247         } catch (ExecutionException ee) {
248             assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
249         }
250     }
251 
252     @Test(timeOut = 30_000)
253     public void testTransactionStartedBeforeNonOverlapFenceCommits() throws Exception {
254 
255         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
256 
257         tsoClient.getFence(7).get();
258 
259         try {
260             tsoClient.commit(startTsTx1, Sets.newHashSet(c1, c2)).get();
261         } catch (ExecutionException ee) {
262             Assert.fail("TX should successfully commit");        }
263     }
264 
265     @Test(timeOut = 30_000)
266     public void testTransactionStartedAfterFenceCommits() throws Exception {
267 
268         tsoClient.getFence(c1.getTableId()).get();
269 
270         long startTsTx1 = tsoClient.getNewStartTimestamp().get();
271 
272         try {
273             tsoClient.commit(startTsTx1, Sets.newHashSet(c1, c2)).get();
274         } catch (ExecutionException ee) {
275             Assert.fail("TX should successfully commit");
276         }
277     }
278 
279     @Test(timeOut = 30_000)
280     public void testConflictsAndMonotonicallyTimestampGrowthWithTwoDifferentTSOClients() throws Exception {
281         long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
282         long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
283         long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
284 
285         Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
286         try {
287             tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
288             Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
289         } catch (ExecutionException ee) {
290             assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted");
291         }
292         long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
293 
294         assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
295         if (!tsoClient.isLowLatency())
296             commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
297         assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
298         assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
299     }
300 
301     @Test(timeOut = 30_000)
302     public void testLowWaterMarksgetAdvanced() throws ExecutionException, InterruptedException {
303 
304         long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
305         HashSet<DummyCellIdImpl> ws = new HashSet<>();
306         for (int i=0; i< 1000*32; ++i) {
307             ws.add(new DummyCellIdImpl(i));
308         }
309 
310         Long beforeCommitLWM = commitTableClient.readLowWatermark().get();
311 
312         Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, ws).get();
313 
314         Thread.sleep(300);
315 
316         Long afterCommitLWM = commitTableClient.readLowWatermark().get();
317 
318         assert(afterCommitLWM > beforeCommitLWM);
319     }
320 
321 }