View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.timestamp.storage;
19  
20  import org.apache.curator.RetryPolicy;
21  import org.apache.curator.framework.CuratorFramework;
22  import org.apache.curator.framework.CuratorFrameworkFactory;
23  import org.apache.curator.retry.ExponentialBackoffRetry;
24  import org.apache.curator.test.TestingServer;
25  import org.apache.curator.utils.CloseableUtils;
26  import org.apache.zookeeper.ZooKeeper;
27  import org.apache.zookeeper.data.Stat;
28  import org.mockito.Mockito;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.testng.annotations.AfterMethod;
32  import org.testng.annotations.BeforeMethod;
33  import org.testng.annotations.Test;
34  
35  import java.io.IOException;
36  import java.net.BindException;
37  
38  import static org.apache.omid.timestamp.storage.ZKTimestampPaths.TIMESTAMP_ZNODE;
39  import static org.apache.omid.timestamp.storage.ZKTimestampStorage.INITIAL_MAX_TS_VALUE;
40  import static org.mockito.Mockito.doThrow;
41  import static org.testng.Assert.assertEquals;
42  import static org.testng.Assert.fail;
43  
44  public class TestZKTimestampStorage {
45  
46      private static final Logger LOG = LoggerFactory.getLogger(TestZKTimestampStorage.class);
47  
48      private static final int BYTES_IN_LONG = 8;
49  
50      private static final int ZK_PORT = 16666;
51      private static final String ZK_CLUSTER = "localhost:" + ZK_PORT;
52  
53      private static final long NEGATIVE_TS = -1;
54  
55      private static final int ITERATION_COUNT = 10;
56  
57      private TestingServer zkServer;
58  
59      private CuratorFramework zkClient;
60  
61      private ZKTimestampStorage storage;
62  
63      private CuratorFramework storageInternalZKClient;
64  
65      @BeforeMethod
66      public void initStuff() throws Exception {
67          LOG.info("Creating ZK server instance listening in port {}...", ZK_PORT);
68          while (zkServer == null) {
69              try {
70                  zkServer = new TestingServer(ZK_PORT);
71              } catch (BindException e) {
72                  System.err.println("Getting bind exception - retrying to allocate server");
73                  zkServer = null;
74              }
75          }
76          LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
77  
78          RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
79  
80          LOG.info("Creating Zookeeper Client connected to {}", ZK_CLUSTER);
81          zkClient = CuratorFrameworkFactory.builder()
82                  .namespace("omid")
83                  .connectString(ZK_CLUSTER)
84                  .retryPolicy(retryPolicy)
85                  .connectionTimeoutMs(10) // Low timeout for tests
86                  .build();
87          zkClient.start();
88          zkClient.blockUntilConnected();
89  
90          LOG.info("Creating Internal Zookeeper Client connected to {}", ZK_CLUSTER);
91          storageInternalZKClient = Mockito.spy(CuratorFrameworkFactory.builder()
92                                                        .namespace("omid")
93                                                        .connectString(ZK_CLUSTER)
94                                                        .retryPolicy(retryPolicy)
95                                                        .connectionTimeoutMs(10) // Low timeout for tests
96                                                        .build());
97          storageInternalZKClient.start();
98          storageInternalZKClient.blockUntilConnected();
99  
100         storage = new ZKTimestampStorage(storageInternalZKClient);
101     }
102 
103     @AfterMethod
104     public void closeStuff() throws Exception {
105 
106         CloseableUtils.closeQuietly(zkClient);
107         LOG.info("ZK Client state {}", zkClient.getState());
108         zkClient = null;
109 
110         CloseableUtils.closeQuietly(storageInternalZKClient);
111         LOG.info("ZK Internal Client state {}", storageInternalZKClient.getState());
112         storageInternalZKClient = null;
113 
114         CloseableUtils.closeQuietly(zkServer);
115         LOG.info("ZK Server Stopped");
116         zkServer = null;
117 
118     }
119 
120     @Test(timeOut = 10_000)
121     public void testBasicFunctionality() throws Exception {
122 
123         // Check ZNode for timestamp exists (storage instantiation should create it)
124         Stat zNodeStats = zkClient.checkExists().forPath(TIMESTAMP_ZNODE);
125         assertEquals(zNodeStats.getVersion(), 0);
126 
127         // Initial checks
128         assertEquals(storage.getMaxTimestamp(), INITIAL_MAX_TS_VALUE);
129         byte[] data = zkClient.getData().forPath(TIMESTAMP_ZNODE);
130         assertEquals(data.length, BYTES_IN_LONG);
131 
132         // Check new timestamp does not allow negative values...
133         try {
134             storage.updateMaxTimestamp(INITIAL_MAX_TS_VALUE, NEGATIVE_TS);
135             fail();
136         } catch (IllegalArgumentException e) {
137             // Expected exception
138         }
139 
140         // ...nor is less than previous timestamp
141         try {
142             storage.updateMaxTimestamp(1, 0);
143             fail();
144         } catch (IllegalArgumentException e) {
145             // Expected exception
146         }
147 
148         // Check that the original version is still there
149         zNodeStats = zkClient.checkExists().forPath(TIMESTAMP_ZNODE);
150         assertEquals(zNodeStats.getVersion(), 0);
151 
152         // Iterate updating the timestamp and check the final value
153         long previousMaxTimestamp = INITIAL_MAX_TS_VALUE;
154         for (int i = 0; i < ITERATION_COUNT; i++) {
155             long newMaxTimestamp = previousMaxTimestamp + 1_000_000;
156             storage.updateMaxTimestamp(previousMaxTimestamp, newMaxTimestamp);
157             previousMaxTimestamp = newMaxTimestamp;
158         }
159         assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
160         // Check the znode version has changed accordingly
161         zNodeStats = zkClient.checkExists().forPath(TIMESTAMP_ZNODE);
162         assertEquals(zNodeStats.getVersion(), ITERATION_COUNT);
163 
164         // Check exceptions
165         doThrow(new RuntimeException()).when(storageInternalZKClient).getData();
166         try {
167             storage.getMaxTimestamp();
168             fail();
169         } catch (IOException e) {
170             // Expected exception
171         }
172 
173         doThrow(new RuntimeException()).when(storageInternalZKClient).setData();
174         try {
175             storage.updateMaxTimestamp(INITIAL_MAX_TS_VALUE, INITIAL_MAX_TS_VALUE + 1_000_000);
176             fail();
177         } catch (IOException e) {
178             // Expected exception
179         }
180 
181         // Reset the mock and double-check last result
182         Mockito.reset(storageInternalZKClient);
183         assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
184 
185         // Finally check the znode version is still the same
186         zNodeStats = zkClient.checkExists().forPath(TIMESTAMP_ZNODE);
187         assertEquals(zNodeStats.getVersion(), ITERATION_COUNT);
188     }
189 
190     @Test(timeOut = 20_000)
191     public void testZkClientWhenZKIsDownAndRestarts() throws Exception {
192 
193         // Iterate updating the timestamp and check the final value
194         long previousMaxTimestamp = INITIAL_MAX_TS_VALUE;
195         for (int i = 0; i < ITERATION_COUNT; i++) {
196             long newMaxTimestamp = previousMaxTimestamp + 1_000_000;
197             storage.updateMaxTimestamp(previousMaxTimestamp, newMaxTimestamp);
198             previousMaxTimestamp = newMaxTimestamp;
199         }
200         assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
201 
202         // Stop ZK Server, expect the IO exception, reconnect and get the right value
203         LOG.info("Stopping ZK Server");
204         zkServer.stop();
205         LOG.info("ZK Server Stopped");
206 
207         try {
208             storage.getMaxTimestamp();
209             fail();
210         } catch (IOException ioe) {
211             // Expected exception
212         }
213 
214         LOG.info("Restarting ZK again");
215         zkServer.restart();
216         assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
217 
218     }
219 
220     @Test(timeOut = 10_000)
221     public void testZkClientLosingSession() throws Exception {
222 
223         // Cut the session in the server through the client
224         long sessionId = zkClient.getZookeeperClient().getZooKeeper().getSessionId();
225         byte[] sessionPasswd = zkClient.getZookeeperClient().getZooKeeper().getSessionPasswd();
226         ZooKeeper zk = new ZooKeeper(ZK_CLUSTER, 1000, null, sessionId, sessionPasswd);
227         zk.close();
228         LOG.info("ZKClient session closed");
229 
230         // Iterate updating the timestamp and check the final value
231         long previousMaxTimestamp = INITIAL_MAX_TS_VALUE;
232         for (int i = 0; i < ITERATION_COUNT; i++) {
233             long newMaxTimestamp = previousMaxTimestamp + 1_000_000;
234             storage.updateMaxTimestamp(previousMaxTimestamp, newMaxTimestamp);
235             LOG.info("Updating timestamp. Previous/New {}/{}", previousMaxTimestamp, newMaxTimestamp);
236             previousMaxTimestamp = newMaxTimestamp;
237         }
238         assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
239 
240     }
241 
242 }