1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import static org.apache.omid.tso.client.TSOClient.DEFAULT_ZK_CLUSTER;
21 import static org.mockito.Matchers.any;
22 import static org.mockito.Matchers.anyString;
23 import static org.mockito.Mockito.mock;
24 import static org.mockito.Mockito.reset;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.timeout;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.when;
30 import static org.testng.Assert.assertEquals;
31 import static org.testng.Assert.assertFalse;
32 import static org.testng.Assert.assertTrue;
33
34 import java.io.IOException;
35
36 import org.apache.curator.framework.CuratorFramework;
37 import org.apache.curator.test.TestingServer;
38 import org.apache.curator.utils.CloseableUtils;
39 import org.apache.omid.TestUtils;
40 import org.apache.omid.tso.TSOStateManager.TSOState;
41 import org.mockito.ArgumentCaptor;
42 import org.mockito.Mock;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.testng.annotations.AfterClass;
46 import org.testng.annotations.BeforeClass;
47 import org.testng.annotations.Test;
48
49 import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
50
51 public class TestLeaseManager {
52
53 private static final long DUMMY_EPOCH_1 = 1L;
54 private static final long DUMMY_EPOCH_2 = 2L;
55 private static final long DUMMY_EPOCH_3 = 3L;
56 private static final long DUMMY_LOW_WATERMARK_1 = DUMMY_EPOCH_1;
57 private static final long DUMMY_LOW_WATERMARK_2 = DUMMY_EPOCH_2;
58 private static final long DUMMY_LOW_WATERMARK_3 = DUMMY_EPOCH_3;
59
60 private static final String LEASE_MGR_ID_1 = "LM1";
61 private static final String LEASE_MGR_ID_2 = "LM2";
62 private static final String INSTANCE_ID_1 = "LM1" + "#";
63 private static final String INSTANCE_ID_2 = "LM2" + "#";
64
65 private static final Logger LOG = LoggerFactory.getLogger(TestLeaseManager.class);
66
67 private static final long TEST_LEASE_PERIOD_IN_MS = 5000;
68
69 private CuratorFramework zkClient;
70 private TestingServer zkServer;
71
72 @Mock
73 private Panicker panicker;
74
75 private PausableLeaseManager leaseManager1;
76 private PausableLeaseManager leaseManager2;
77
78 @BeforeClass
79 public void beforeClass() throws Exception {
80
81 LOG.info("Starting ZK Server");
82 zkServer = TestUtils.provideTestingZKServer();
83 LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
84
85 zkClient = TestUtils.provideConnectedZKClient(DEFAULT_ZK_CLUSTER);
86
87 }
88
89 @AfterClass
90 public void afterClass() throws Exception {
91
92 zkClient.close();
93
94 CloseableUtils.closeQuietly(zkServer);
95 zkServer = null;
96 LOG.info("ZK Server Stopped");
97
98 }
99
100 @Test(timeOut = 80_000)
101 public void testErrorInitializingTSOStateExitsTheTSO() throws Exception {
102
103 final String TEST_TSO_LEASE_PATH = "/test0_tsolease";
104 final String TEST_CURRENT_TSO_PATH = "/test0_currenttso";
105
106 Panicker panicker = spy(new MockPanicker());
107
108 TSOChannelHandler tsoChannelHandler = mock(TSOChannelHandler.class);
109 TSOStateManager stateManager = mock(TSOStateManager.class);
110 when(stateManager.initialize()).thenThrow(new IOException());
111 leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
112 tsoChannelHandler,
113 stateManager,
114 TEST_LEASE_PERIOD_IN_MS,
115 TEST_TSO_LEASE_PATH,
116 TEST_CURRENT_TSO_PATH,
117 zkClient,
118 panicker);
119 leaseManager1.startService();
120
121
122 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
123
124 verify(panicker, timeout(2000).atLeastOnce()).panic(anyString(), any(IOException.class));
125
126 leaseManager1.stopService();
127
128 }
129
130 @Test(timeOut = 80_000)
131 public void testLeaseHolderDoesNotChangeWhenPausedForALongTimeAndTheresNoOtherInstance() throws Exception {
132
133 final String TEST_TSO_LEASE_PATH = "/test1_tsolease";
134 final String TEST_CURRENT_TSO_PATH = "/test1_currenttso";
135
136
137 TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
138 TSOStateManager stateManager1 = mock(TSOStateManager.class);
139 when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
140 leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
141 tsoChannelHandler1,
142 stateManager1,
143 TEST_LEASE_PERIOD_IN_MS,
144 TEST_TSO_LEASE_PATH,
145 TEST_CURRENT_TSO_PATH,
146 zkClient,
147 panicker);
148 leaseManager1.startService();
149
150
151 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
152
153
154 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
155 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
156 assertTrue(leaseManager1.stillInLeasePeriod());
157
158
159 leaseManager1.pausedInTryToRenewLeasePeriod();
160
161
162 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
163
164
165 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
166 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
167
168
169 leaseManager1.resume();
170
171
172 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
173
174
175 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
176 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
177 assertFalse(leaseManager1.stillInLeasePeriod());
178
179 }
180
181 @Test(timeOut = 80_000)
182 public void testLeaseHolderDoesNotChangeWhenANewLeaseManagerIsUp() throws Exception {
183
184 final String TEST_TSO_LEASE_PATH = "/test2_tsolease";
185 final String TEST_CURRENT_TSO_PATH = "/test2_currenttso";
186
187
188 TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
189 TSOStateManager stateManager1 = mock(TSOStateManager.class);
190 when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
191 leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
192 tsoChannelHandler1,
193 stateManager1,
194 TEST_LEASE_PERIOD_IN_MS,
195 TEST_TSO_LEASE_PATH,
196 TEST_CURRENT_TSO_PATH,
197 zkClient,
198 panicker);
199
200 leaseManager1.startService();
201
202
203 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
204
205
206 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
207 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
208 assertTrue(leaseManager1.stillInLeasePeriod());
209
210
211 TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
212 TSOStateManager stateManager2 = mock(TSOStateManager.class);
213 when(stateManager2.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
214 leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
215 tsoChannelHandler2,
216 stateManager2,
217 TEST_LEASE_PERIOD_IN_MS,
218 TEST_TSO_LEASE_PATH,
219 TEST_CURRENT_TSO_PATH,
220 zkClient,
221 panicker);
222 leaseManager2.startService();
223
224
225 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
226
227
228 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
229 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
230 assertTrue(leaseManager1.stillInLeasePeriod());
231 assertFalse(leaseManager2.stillInLeasePeriod());
232 }
233
234 @Test(timeOut = 80_000)
235 public void testLeaseHolderChangesWhenActiveLeaseManagerIsPaused() throws Exception {
236
237 final String TEST_TSO_LEASE_PATH = "/test3_tsolease";
238 final String TEST_CURRENT_TSO_PATH = "/test3_currenttso";
239
240
241 TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
242 TSOStateManager stateManager1 = mock(TSOStateManager.class);
243 when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
244 leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
245 tsoChannelHandler1,
246 stateManager1,
247 TEST_LEASE_PERIOD_IN_MS,
248 TEST_TSO_LEASE_PATH,
249 TEST_CURRENT_TSO_PATH,
250 zkClient,
251 panicker);
252
253 leaseManager1.startService();
254
255
256 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
257
258
259 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
260 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
261 assertTrue(leaseManager1.stillInLeasePeriod());
262
263
264 TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
265 TSOStateManager stateManager2 = mock(TSOStateManager.class);
266 when(stateManager2.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
267 leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
268 tsoChannelHandler2,
269 stateManager2,
270 TEST_LEASE_PERIOD_IN_MS,
271 TEST_TSO_LEASE_PATH,
272 TEST_CURRENT_TSO_PATH,
273 zkClient,
274 panicker);
275 leaseManager2.startService();
276
277
278 leaseManager1.pausedInStillInLeasePeriod();
279
280
281 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
282
283
284 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
285 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
286 assertTrue(leaseManager2.stillInLeasePeriod());
287
288
289 when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_3, DUMMY_EPOCH_3));
290 leaseManager1.resume();
291
292
293 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
294
295
296 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
297 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
298 assertFalse(leaseManager1.stillInLeasePeriod());
299 assertTrue(leaseManager2.stillInLeasePeriod());
300
301
302 leaseManager2.pausedInTryToRenewLeasePeriod();
303
304
305 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
306
307
308 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
309 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
310 assertFalse(leaseManager2.stillInLeasePeriod());
311 assertTrue(leaseManager1.stillInLeasePeriod());
312
313
314 leaseManager2.resume();
315
316
317 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
318
319
320 checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
321 checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
322 assertFalse(leaseManager2.stillInLeasePeriod());
323 assertTrue(leaseManager1.stillInLeasePeriod());
324
325 }
326
327
328 @Test(timeOut = 80_000)
329 public void testLeaseManagerPanicsWhenUnexpectedInfoIsFoundInCurrentTSOZnode() throws Exception {
330
331 final String TEST_TSO_LEASE_PATH = "/test_wronginfo_tsolease";
332 final String TEST_CURRENT_TSO_PATH = "/test_wronginfo_currenttso";
333
334 Panicker panicker = spy(new MockPanicker());
335
336
337 TSOStateManager stateManager1 = mock(TSOStateManager.class);
338 when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
339 PausableLeaseManager leaseManager = new PausableLeaseManager(LEASE_MGR_ID_1,
340 mock(TSOChannelHandler.class),
341 stateManager1,
342 TEST_LEASE_PERIOD_IN_MS,
343 TEST_TSO_LEASE_PATH,
344 TEST_CURRENT_TSO_PATH,
345 zkClient,
346 panicker);
347
348 leaseManager.startService();
349
350 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
351
352 leaseManager.pausedInTryToRenewLeasePeriod();
353
354
355 zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "CorruptedData!!!".getBytes());
356
357
358 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
359 leaseManager.resume();
360
361 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
362
363 ArgumentCaptor<IllegalArgumentException> trowableIAE = ArgumentCaptor.forClass(IllegalArgumentException.class);
364 verify(panicker, times(2)).panic(anyString(), trowableIAE.capture());
365 assertTrue(trowableIAE.getValue() != null);
366 assertTrue(trowableIAE.getValue().getMessage().contains("Incorrect TSO Info found"));
367
368
369
370 reset(panicker);
371 zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "newTSO:12345#10000".getBytes());
372
373 leaseManager.pausedInTryToRenewLeasePeriod();
374
375
376 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
377 leaseManager.resume();
378
379 Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
380
381 ArgumentCaptor<LeaseManagement.LeaseManagementException> trowableLME =
382 ArgumentCaptor.forClass(LeaseManagement.LeaseManagementException.class);
383 verify(panicker, times(2)).panic(anyString(), trowableLME.capture());
384 assertTrue(trowableLME.getValue() != null);
385 assertTrue(trowableLME.getValue().getMessage().contains("Another TSO replica was found"));
386 }
387
388 @Test(timeOut = 1000)
389 public void testNonHALeaseManager() throws Exception {
390
391
392 VoidLeaseManager leaseManager = new VoidLeaseManager(mock(TSOChannelHandler.class),
393 mock(TSOStateManager.class));
394
395 leaseManager.startService();
396 assertTrue(leaseManager.stillInLeasePeriod());
397 leaseManager.stopService();
398
399 }
400
401
402
403
404
405 private void checkLeaseHolder(String tsoLeasePath, String expectedLeaseHolder) throws Exception {
406 byte[] leaseHolderInBytes = zkClient.getData().forPath(tsoLeasePath);
407 String leaseHolder = new String(leaseHolderInBytes, Charsets.UTF_8);
408
409 assertEquals(leaseHolder, expectedLeaseHolder);
410 }
411
412 private void checkInstanceId(String currentTSOPath, String expectedInstanceId) throws Exception {
413 byte[] expectedInstanceIdInBytes = zkClient.getData().forPath(currentTSOPath);
414 String instanceId = new String(expectedInstanceIdInBytes, Charsets.UTF_8);
415
416 assertEquals(instanceId, expectedInstanceId);
417 }
418
419 }