View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import static org.apache.omid.tso.client.TSOClient.DEFAULT_ZK_CLUSTER;
21  import static org.mockito.Matchers.any;
22  import static org.mockito.Matchers.anyString;
23  import static org.mockito.Mockito.mock;
24  import static org.mockito.Mockito.reset;
25  import static org.mockito.Mockito.spy;
26  import static org.mockito.Mockito.timeout;
27  import static org.mockito.Mockito.times;
28  import static org.mockito.Mockito.verify;
29  import static org.mockito.Mockito.when;
30  import static org.testng.Assert.assertEquals;
31  import static org.testng.Assert.assertFalse;
32  import static org.testng.Assert.assertTrue;
33  
34  import java.io.IOException;
35  
36  import org.apache.curator.framework.CuratorFramework;
37  import org.apache.curator.test.TestingServer;
38  import org.apache.curator.utils.CloseableUtils;
39  import org.apache.omid.TestUtils;
40  import org.apache.omid.tso.TSOStateManager.TSOState;
41  import org.mockito.ArgumentCaptor;
42  import org.mockito.Mock;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  import org.testng.annotations.AfterClass;
46  import org.testng.annotations.BeforeClass;
47  import org.testng.annotations.Test;
48  
49  import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
50  
51  public class TestLeaseManager {
52  
53      private static final long DUMMY_EPOCH_1 = 1L;
54      private static final long DUMMY_EPOCH_2 = 2L;
55      private static final long DUMMY_EPOCH_3 = 3L;
56      private static final long DUMMY_LOW_WATERMARK_1 = DUMMY_EPOCH_1;
57      private static final long DUMMY_LOW_WATERMARK_2 = DUMMY_EPOCH_2;
58      private static final long DUMMY_LOW_WATERMARK_3 = DUMMY_EPOCH_3;
59  
60      private static final String LEASE_MGR_ID_1 = "LM1";
61      private static final String LEASE_MGR_ID_2 = "LM2";
62      private static final String INSTANCE_ID_1 = "LM1" + "#";
63      private static final String INSTANCE_ID_2 = "LM2" + "#";
64  
65      private static final Logger LOG = LoggerFactory.getLogger(TestLeaseManager.class);
66  
67      private static final long TEST_LEASE_PERIOD_IN_MS = 5000; // 5 seconds
68  
69      private CuratorFramework zkClient;
70      private TestingServer zkServer;
71  
72      @Mock
73      private Panicker panicker;
74  
75      private PausableLeaseManager leaseManager1;
76      private PausableLeaseManager leaseManager2;
77  
78      @BeforeClass
79      public void beforeClass() throws Exception {
80  
81          LOG.info("Starting ZK Server");
82          zkServer = TestUtils.provideTestingZKServer();
83          LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
84  
85          zkClient = TestUtils.provideConnectedZKClient(DEFAULT_ZK_CLUSTER);
86  
87      }
88  
89      @AfterClass
90      public void afterClass() throws Exception {
91  
92          zkClient.close();
93  
94          CloseableUtils.closeQuietly(zkServer);
95          zkServer = null;
96          LOG.info("ZK Server Stopped");
97  
98      }
99  
100     @Test(timeOut = 80_000)
101     public void testErrorInitializingTSOStateExitsTheTSO() throws Exception {
102 
103         final String TEST_TSO_LEASE_PATH = "/test0_tsolease";
104         final String TEST_CURRENT_TSO_PATH = "/test0_currenttso";
105 
106         Panicker panicker = spy(new MockPanicker());
107 
108         TSOChannelHandler tsoChannelHandler = mock(TSOChannelHandler.class);
109         TSOStateManager stateManager = mock(TSOStateManager.class);
110         when(stateManager.initialize()).thenThrow(new IOException());
111         leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
112                                                  tsoChannelHandler,
113                                                  stateManager,
114                                                  TEST_LEASE_PERIOD_IN_MS,
115                                                  TEST_TSO_LEASE_PATH,
116                                                  TEST_CURRENT_TSO_PATH,
117                                                  zkClient,
118                                                  panicker);
119         leaseManager1.startService();
120 
121         // ... let the test run for some time...
122         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
123 
124         verify(panicker, timeout(2000).atLeastOnce()).panic(anyString(), any(IOException.class));
125 
126         leaseManager1.stopService();
127 
128     }
129 
130     @Test(timeOut = 80_000)
131     public void testLeaseHolderDoesNotChangeWhenPausedForALongTimeAndTheresNoOtherInstance() throws Exception {
132 
133         final String TEST_TSO_LEASE_PATH = "/test1_tsolease";
134         final String TEST_CURRENT_TSO_PATH = "/test1_currenttso";
135 
136         // Launch the instance under test...
137         TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
138         TSOStateManager stateManager1 = mock(TSOStateManager.class);
139         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
140         leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
141                                                  tsoChannelHandler1,
142                                                  stateManager1,
143                                                  TEST_LEASE_PERIOD_IN_MS,
144                                                  TEST_TSO_LEASE_PATH,
145                                                  TEST_CURRENT_TSO_PATH,
146                                                  zkClient,
147                                                  panicker);
148         leaseManager1.startService();
149 
150         // ... let the test run for some time...
151         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
152 
153         // ... check is the lease holder
154         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
155         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
156         assertTrue(leaseManager1.stillInLeasePeriod());
157 
158         // Then, pause instance when trying to renew lease...
159         leaseManager1.pausedInTryToRenewLeasePeriod();
160 
161         // ...let the test run for some time...
162         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
163 
164         // ...check that nothing changed...
165         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
166         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
167 
168         // Finally, resume the instance...
169         leaseManager1.resume();
170 
171         // ... let the test run for some time...
172         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
173 
174         // ... and check again that nothing changed
175         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
176         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
177         assertFalse(leaseManager1.stillInLeasePeriod()); // Must not be master as it should have triggered and exception
178 
179     }
180 
181     @Test(timeOut = 80_000)
182     public void testLeaseHolderDoesNotChangeWhenANewLeaseManagerIsUp() throws Exception {
183 
184         final String TEST_TSO_LEASE_PATH = "/test2_tsolease";
185         final String TEST_CURRENT_TSO_PATH = "/test2_currenttso";
186 
187         // Launch the master instance...
188         TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
189         TSOStateManager stateManager1 = mock(TSOStateManager.class);
190         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
191         leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
192                                                  tsoChannelHandler1,
193                                                  stateManager1,
194                                                  TEST_LEASE_PERIOD_IN_MS,
195                                                  TEST_TSO_LEASE_PATH,
196                                                  TEST_CURRENT_TSO_PATH,
197                                                  zkClient,
198                                                  panicker);
199 
200         leaseManager1.startService();
201 
202         // ...let the test run for some time...
203         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
204 
205         // ...so it should be the current holder of the lease
206         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
207         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
208         assertTrue(leaseManager1.stillInLeasePeriod());
209 
210         // Then launch another instance...
211         TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
212         TSOStateManager stateManager2 = mock(TSOStateManager.class);
213         when(stateManager2.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
214         leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
215                                                  tsoChannelHandler2,
216                                                  stateManager2,
217                                                  TEST_LEASE_PERIOD_IN_MS,
218                                                  TEST_TSO_LEASE_PATH,
219                                                  TEST_CURRENT_TSO_PATH,
220                                                  zkClient,
221                                                  panicker);
222         leaseManager2.startService();
223 
224         // ... let the test run for some time...
225         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
226 
227         // ... and after the period, the first instance should be still the holder
228         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
229         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
230         assertTrue(leaseManager1.stillInLeasePeriod());
231         assertFalse(leaseManager2.stillInLeasePeriod());
232     }
233 
234     @Test(timeOut = 80_000)
235     public void testLeaseHolderChangesWhenActiveLeaseManagerIsPaused() throws Exception {
236 
237         final String TEST_TSO_LEASE_PATH = "/test3_tsolease";
238         final String TEST_CURRENT_TSO_PATH = "/test3_currenttso";
239 
240         // Launch the master instance...
241         TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
242         TSOStateManager stateManager1 = mock(TSOStateManager.class);
243         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
244         leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
245                                                  tsoChannelHandler1,
246                                                  stateManager1,
247                                                  TEST_LEASE_PERIOD_IN_MS,
248                                                  TEST_TSO_LEASE_PATH,
249                                                  TEST_CURRENT_TSO_PATH,
250                                                  zkClient,
251                                                  panicker);
252 
253         leaseManager1.startService();
254 
255         // ... let the test run for some time...
256         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
257 
258         // ... so it should be the current holder of the lease
259         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
260         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
261         assertTrue(leaseManager1.stillInLeasePeriod());
262 
263         // Then launch another instance...
264         TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
265         TSOStateManager stateManager2 = mock(TSOStateManager.class);
266         when(stateManager2.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
267         leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
268                                                  tsoChannelHandler2,
269                                                  stateManager2,
270                                                  TEST_LEASE_PERIOD_IN_MS,
271                                                  TEST_TSO_LEASE_PATH,
272                                                  TEST_CURRENT_TSO_PATH,
273                                                  zkClient,
274                                                  panicker);
275         leaseManager2.startService();
276 
277         // ... and pause active lease manager...
278         leaseManager1.pausedInStillInLeasePeriod();
279 
280         // ... and let the test run for some time...
281         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
282 
283         // ... and check that lease owner should have changed to the second instance
284         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
285         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
286         assertTrue(leaseManager2.stillInLeasePeriod());
287 
288         // Now, lets resume the first instance...
289         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_3, DUMMY_EPOCH_3));
290         leaseManager1.resume();
291 
292         // ... let the test run for some time...
293         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
294 
295         // and check the lease owner is still the second instance (preserves the lease)
296         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
297         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
298         assertFalse(leaseManager1.stillInLeasePeriod());
299         assertTrue(leaseManager2.stillInLeasePeriod());
300 
301         // Finally, pause active lease manager when trying to renew lease...
302         leaseManager2.pausedInTryToRenewLeasePeriod();
303 
304         // ... let the test run for some time...
305         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
306 
307         // ... and check lease owner is has changed again to the first instance
308         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
309         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
310         assertFalse(leaseManager2.stillInLeasePeriod());
311         assertTrue(leaseManager1.stillInLeasePeriod());
312 
313         // Resume the second instance...
314         leaseManager2.resume();
315 
316         // ... let the test run for some time...
317         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
318 
319         // ... but the lease owner should still be the first instance
320         checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
321         checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
322         assertFalse(leaseManager2.stillInLeasePeriod());
323         assertTrue(leaseManager1.stillInLeasePeriod());
324 
325     }
326 
327 
328     @Test(timeOut = 80_000)
329     public void testLeaseManagerPanicsWhenUnexpectedInfoIsFoundInCurrentTSOZnode() throws Exception {
330 
331         final String TEST_TSO_LEASE_PATH = "/test_wronginfo_tsolease";
332         final String TEST_CURRENT_TSO_PATH = "/test_wronginfo_currenttso";
333 
334         Panicker panicker = spy(new MockPanicker());
335 
336         // Launch the master instance...
337         TSOStateManager stateManager1 = mock(TSOStateManager.class);
338         when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
339         PausableLeaseManager leaseManager = new PausableLeaseManager(LEASE_MGR_ID_1,
340                                                                      mock(TSOChannelHandler.class),
341                                                                      stateManager1,
342                                                                      TEST_LEASE_PERIOD_IN_MS,
343                                                                      TEST_TSO_LEASE_PATH,
344                                                                      TEST_CURRENT_TSO_PATH,
345                                                                      zkClient,
346                                                                      panicker);
347 
348         leaseManager.startService();
349         // ...and let the test run for some time...
350         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
351 
352         leaseManager.pausedInTryToRenewLeasePeriod();
353 
354         // 1st Panic test) Inject corrupted data in the ZNode, force reelection and test the panicker is exercised
355         zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "CorruptedData!!!".getBytes());
356 
357         // ...and let the test run for some time...
358         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
359         leaseManager.resume();
360         // ...and let the test run for some time...
361         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
362 
363         ArgumentCaptor<IllegalArgumentException> trowableIAE = ArgumentCaptor.forClass(IllegalArgumentException.class);
364         verify(panicker, times(2)).panic(anyString(), trowableIAE.capture());
365         assertTrue(trowableIAE.getValue() != null);
366         assertTrue(trowableIAE.getValue().getMessage().contains("Incorrect TSO Info found"));
367 
368         // 2nd Panic test) Simulate that a new master appeared in the meantime, force reelection
369         // and test the panicker is exercised
370         reset(panicker);
371         zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "newTSO:12345#10000".getBytes());
372 
373         leaseManager.pausedInTryToRenewLeasePeriod();
374 
375         // ...and let the test run for some time...
376         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
377         leaseManager.resume();
378         // ...and let the test run for some time...
379         Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
380 
381         ArgumentCaptor<LeaseManagement.LeaseManagementException> trowableLME =
382                 ArgumentCaptor.forClass(LeaseManagement.LeaseManagementException.class);
383         verify(panicker, times(2)).panic(anyString(), trowableLME.capture());
384         assertTrue(trowableLME.getValue() != null);
385         assertTrue(trowableLME.getValue().getMessage().contains("Another TSO replica was found"));
386     }
387 
388     @Test(timeOut = 1000)
389     public void testNonHALeaseManager() throws Exception {
390 
391         // Launch the instance...
392         VoidLeaseManager leaseManager = new VoidLeaseManager(mock(TSOChannelHandler.class),
393                                                              mock(TSOStateManager.class));
394 
395         leaseManager.startService();
396         assertTrue(leaseManager.stillInLeasePeriod());
397         leaseManager.stopService();
398 
399     }
400 
401     // ----------------------------------------------------------------------------------------------------------------
402     // Checkers
403     // ----------------------------------------------------------------------------------------------------------------
404 
405     private void checkLeaseHolder(String tsoLeasePath, String expectedLeaseHolder) throws Exception {
406         byte[] leaseHolderInBytes = zkClient.getData().forPath(tsoLeasePath);
407         String leaseHolder = new String(leaseHolderInBytes, Charsets.UTF_8);
408 
409         assertEquals(leaseHolder, expectedLeaseHolder);
410     }
411 
412     private void checkInstanceId(String currentTSOPath, String expectedInstanceId) throws Exception {
413         byte[] expectedInstanceIdInBytes = zkClient.getData().forPath(currentTSOPath);
414         String instanceId = new String(expectedInstanceIdInBytes, Charsets.UTF_8);
415 
416         assertEquals(instanceId, expectedInstanceId);
417     }
418 
419 }