1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid;
19
20 import org.apache.commons.io.IOUtils;
21 import org.apache.curator.RetryPolicy;
22 import org.apache.curator.framework.CuratorFramework;
23 import org.apache.curator.framework.CuratorFrameworkFactory;
24 import org.apache.curator.retry.ExponentialBackoffRetry;
25 import org.apache.curator.test.TestingServer;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import java.io.IOException;
30 import java.net.ServerSocket;
31 import java.net.Socket;
32
33
34
35
36 public class TestUtils {
37
38 private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
39
40 private static final int DEFAULT_ZK_PORT = 2181;
41
42 public static int getFreeLocalPort() throws IOException {
43
44 try (ServerSocket socket = new ServerSocket(0)) {
45 socket.setReuseAddress(true);
46 return socket.getLocalPort();
47 }
48
49 }
50
51 public static TestingServer provideTestingZKServer(int port) throws Exception {
52
53 return new TestingServer(port);
54
55 }
56
57 public static TestingServer provideTestingZKServer() throws Exception {
58
59 return provideTestingZKServer(DEFAULT_ZK_PORT);
60
61 }
62
63 public static CuratorFramework provideConnectedZKClient(String zkCluster) throws Exception {
64
65 LOG.info("Creating Zookeeper Client connecting to {}", zkCluster);
66
67 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
68 CuratorFramework zkClient = CuratorFrameworkFactory.builder().namespace("omid")
69 .connectString(zkCluster).retryPolicy(retryPolicy).build();
70
71 LOG.info("Connecting to ZK cluster {}", zkClient.getState());
72 zkClient.start();
73 zkClient.blockUntilConnected();
74 LOG.info("Connection to ZK cluster {}", zkClient.getState());
75
76 return zkClient;
77 }
78
79 public static void waitForSocketListening(String host, int port, int sleepTimeMillis)
80 throws IOException, InterruptedException {
81 while (true) {
82 Socket sock = null;
83 try {
84 sock = new Socket(host, port);
85 } catch (IOException e) {
86
87 Thread.sleep(sleepTimeMillis);
88 continue;
89 } finally {
90 IOUtils.closeQuietly(sock);
91 }
92 LOG.info("Host " + host + ":" + port + " is up...");
93 break;
94 }
95 }
96
97 public static void waitForSocketNotListening(String host, int port, int sleepTimeMillis)
98 throws IOException, InterruptedException {
99 while (true) {
100 Socket sock = null;
101 try {
102 sock = new Socket(host, port);
103 } catch (IOException e) {
104
105 break;
106 } finally {
107 IOUtils.closeQuietly(sock);
108 }
109 Thread.sleep(sleepTimeMillis);
110 LOG.info("Host " + host + ":" + port + " is still up...");
111 }
112 }
113
114 }