1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.timestamp.storage;
19
20 import org.apache.curator.RetryPolicy;
21 import org.apache.curator.framework.CuratorFramework;
22 import org.apache.curator.framework.CuratorFrameworkFactory;
23 import org.apache.curator.retry.ExponentialBackoffRetry;
24 import org.apache.curator.test.TestingServer;
25 import org.apache.curator.utils.CloseableUtils;
26 import org.apache.zookeeper.ZooKeeper;
27 import org.apache.zookeeper.data.Stat;
28 import org.mockito.Mockito;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.testng.annotations.AfterMethod;
32 import org.testng.annotations.BeforeMethod;
33 import org.testng.annotations.Test;
34
35 import java.io.IOException;
36 import java.net.BindException;
37
38 import static org.apache.omid.timestamp.storage.ZKTimestampPaths.TIMESTAMP_ZNODE;
39 import static org.apache.omid.timestamp.storage.ZKTimestampStorage.INITIAL_MAX_TS_VALUE;
40 import static org.mockito.Mockito.doThrow;
41 import static org.testng.Assert.assertEquals;
42 import static org.testng.Assert.fail;
43
44 public class TestZKTimestampStorage {
45
46 private static final Logger LOG = LoggerFactory.getLogger(TestZKTimestampStorage.class);
47
48 private static final int BYTES_IN_LONG = 8;
49
50 private static final int ZK_PORT = 16666;
51 private static final String ZK_CLUSTER = "localhost:" + ZK_PORT;
52
53 private static final long NEGATIVE_TS = -1;
54
55 private static final int ITERATION_COUNT = 10;
56
57 private TestingServer zkServer;
58
59 private CuratorFramework zkClient;
60
61 private ZKTimestampStorage storage;
62
63 private CuratorFramework storageInternalZKClient;
64
65 @BeforeMethod
66 public void initStuff() throws Exception {
67 LOG.info("Creating ZK server instance listening in port {}...", ZK_PORT);
68 while (zkServer == null) {
69 try {
70 zkServer = new TestingServer(ZK_PORT);
71 } catch (BindException e) {
72 System.err.println("Getting bind exception - retrying to allocate server");
73 zkServer = null;
74 }
75 }
76 LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
77
78 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
79
80 LOG.info("Creating Zookeeper Client connected to {}", ZK_CLUSTER);
81 zkClient = CuratorFrameworkFactory.builder()
82 .namespace("omid")
83 .connectString(ZK_CLUSTER)
84 .retryPolicy(retryPolicy)
85 .connectionTimeoutMs(10)
86 .build();
87 zkClient.start();
88 zkClient.blockUntilConnected();
89
90 LOG.info("Creating Internal Zookeeper Client connected to {}", ZK_CLUSTER);
91 storageInternalZKClient = Mockito.spy(CuratorFrameworkFactory.builder()
92 .namespace("omid")
93 .connectString(ZK_CLUSTER)
94 .retryPolicy(retryPolicy)
95 .connectionTimeoutMs(10)
96 .build());
97 storageInternalZKClient.start();
98 storageInternalZKClient.blockUntilConnected();
99
100 storage = new ZKTimestampStorage(storageInternalZKClient);
101 }
102
103 @AfterMethod
104 public void closeStuff() throws Exception {
105
106 CloseableUtils.closeQuietly(zkClient);
107 LOG.info("ZK Client state {}", zkClient.getState());
108 zkClient = null;
109
110 CloseableUtils.closeQuietly(storageInternalZKClient);
111 LOG.info("ZK Internal Client state {}", storageInternalZKClient.getState());
112 storageInternalZKClient = null;
113
114 CloseableUtils.closeQuietly(zkServer);
115 LOG.info("ZK Server Stopped");
116 zkServer = null;
117
118 }
119
120 @Test(timeOut = 10_000)
121 public void testBasicFunctionality() throws Exception {
122
123
124 Stat zNodeStats = zkClient.checkExists().forPath(TIMESTAMP_ZNODE);
125 assertEquals(zNodeStats.getVersion(), 0);
126
127
128 assertEquals(storage.getMaxTimestamp(), INITIAL_MAX_TS_VALUE);
129 byte[] data = zkClient.getData().forPath(TIMESTAMP_ZNODE);
130 assertEquals(data.length, BYTES_IN_LONG);
131
132
133 try {
134 storage.updateMaxTimestamp(INITIAL_MAX_TS_VALUE, NEGATIVE_TS);
135 fail();
136 } catch (IllegalArgumentException e) {
137
138 }
139
140
141 try {
142 storage.updateMaxTimestamp(1, 0);
143 fail();
144 } catch (IllegalArgumentException e) {
145
146 }
147
148
149 zNodeStats = zkClient.checkExists().forPath(TIMESTAMP_ZNODE);
150 assertEquals(zNodeStats.getVersion(), 0);
151
152
153 long previousMaxTimestamp = INITIAL_MAX_TS_VALUE;
154 for (int i = 0; i < ITERATION_COUNT; i++) {
155 long newMaxTimestamp = previousMaxTimestamp + 1_000_000;
156 storage.updateMaxTimestamp(previousMaxTimestamp, newMaxTimestamp);
157 previousMaxTimestamp = newMaxTimestamp;
158 }
159 assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
160
161 zNodeStats = zkClient.checkExists().forPath(TIMESTAMP_ZNODE);
162 assertEquals(zNodeStats.getVersion(), ITERATION_COUNT);
163
164
165 doThrow(new RuntimeException()).when(storageInternalZKClient).getData();
166 try {
167 storage.getMaxTimestamp();
168 fail();
169 } catch (IOException e) {
170
171 }
172
173 doThrow(new RuntimeException()).when(storageInternalZKClient).setData();
174 try {
175 storage.updateMaxTimestamp(INITIAL_MAX_TS_VALUE, INITIAL_MAX_TS_VALUE + 1_000_000);
176 fail();
177 } catch (IOException e) {
178
179 }
180
181
182 Mockito.reset(storageInternalZKClient);
183 assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
184
185
186 zNodeStats = zkClient.checkExists().forPath(TIMESTAMP_ZNODE);
187 assertEquals(zNodeStats.getVersion(), ITERATION_COUNT);
188 }
189
190 @Test(timeOut = 20_000)
191 public void testZkClientWhenZKIsDownAndRestarts() throws Exception {
192
193
194 long previousMaxTimestamp = INITIAL_MAX_TS_VALUE;
195 for (int i = 0; i < ITERATION_COUNT; i++) {
196 long newMaxTimestamp = previousMaxTimestamp + 1_000_000;
197 storage.updateMaxTimestamp(previousMaxTimestamp, newMaxTimestamp);
198 previousMaxTimestamp = newMaxTimestamp;
199 }
200 assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
201
202
203 LOG.info("Stopping ZK Server");
204 zkServer.stop();
205 LOG.info("ZK Server Stopped");
206
207 try {
208 storage.getMaxTimestamp();
209 fail();
210 } catch (IOException ioe) {
211
212 }
213
214 LOG.info("Restarting ZK again");
215 zkServer.restart();
216 assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
217
218 }
219
220 @Test(timeOut = 10_000)
221 public void testZkClientLosingSession() throws Exception {
222
223
224 long sessionId = zkClient.getZookeeperClient().getZooKeeper().getSessionId();
225 byte[] sessionPasswd = zkClient.getZookeeperClient().getZooKeeper().getSessionPasswd();
226 ZooKeeper zk = new ZooKeeper(ZK_CLUSTER, 1000, null, sessionId, sessionPasswd);
227 zk.close();
228 LOG.info("ZKClient session closed");
229
230
231 long previousMaxTimestamp = INITIAL_MAX_TS_VALUE;
232 for (int i = 0; i < ITERATION_COUNT; i++) {
233 long newMaxTimestamp = previousMaxTimestamp + 1_000_000;
234 storage.updateMaxTimestamp(previousMaxTimestamp, newMaxTimestamp);
235 LOG.info("Updating timestamp. Previous/New {}/{}", previousMaxTimestamp, newMaxTimestamp);
236 previousMaxTimestamp = newMaxTimestamp;
237 }
238 assertEquals(storage.getMaxTimestamp(), 1_000_000 * ITERATION_COUNT);
239
240 }
241
242 }