View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
21  import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
22  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.AbstractScheduledService;
23  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
24  import org.apache.curator.framework.CuratorFramework;
25  import org.apache.curator.utils.EnsurePath;
26  import org.apache.omid.tso.TSOStateManager.TSOState;
27  import org.apache.zookeeper.KeeperException;
28  import org.apache.zookeeper.data.Stat;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  import java.io.IOException;
33  import java.text.SimpleDateFormat;
34  import java.util.concurrent.ExecutorService;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  /**
40   * Encompasses all the required elements to control the leases required for
41   * identifying the master instance when running multiple TSO instances for HA
42   * It delegates the initialization of the TSO state and the publication of
43   * the instance information when getting the lease to an asynchronous task to
44   * continue managing the leases without interruptions.
45   */
46  class LeaseManager extends AbstractScheduledService implements LeaseManagement {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class);
49  
50      private final CuratorFramework zkClient;
51  
52      private final Panicker panicker;
53  
54      private final String tsoHostAndPort;
55  
56      private final TSOStateManager stateManager;
57      private final ExecutorService tsoStateInitializer = Executors.newSingleThreadExecutor(
58              new ThreadFactoryBuilder()
59                      .setNameFormat("tso-state-initializer")
60                      .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
61                          @Override
62                          public void uncaughtException(Thread t, Throwable e) {
63                              panicker.panic(t + " threw exception", e);
64                          }
65                      })
66                      .build());
67  
68  
69      private final long leasePeriodInMs;
70      private final TSOChannelHandler tsoChannelHandler;
71      private int leaseNodeVersion;
72      private final AtomicLong endLeaseInMs = new AtomicLong(0L);
73      private final AtomicLong baseTimeInMs = new AtomicLong(0L);
74  
75      private final String leasePath;
76      private final String currentTSOPath;
77  
78      LeaseManager(String tsoHostAndPort,
79                   TSOChannelHandler tsoChannelHandler,
80                   TSOStateManager stateManager,
81                   long leasePeriodInMs,
82                   String leasePath,
83                   String currentTSOPath,
84                   CuratorFramework zkClient,
85                   Panicker panicker) {
86  
87          this.tsoHostAndPort = tsoHostAndPort;
88          this.tsoChannelHandler = tsoChannelHandler;
89          this.stateManager = stateManager;
90          this.leasePeriodInMs = leasePeriodInMs;
91          this.leasePath = leasePath;
92          this.currentTSOPath = currentTSOPath;
93          this.zkClient = zkClient;
94          this.panicker = panicker;
95          LOG.info("LeaseManager {} initialized. Lease period {}ms", toString(), leasePeriodInMs);
96  
97      }
98  
99      // ----------------------------------------------------------------------------------------------------------------
100     // LeaseManagement implementation
101     // ----------------------------------------------------------------------------------------------------------------
102 
103     @Override
104     public void startService() throws LeaseManagementException {
105         createLeaseManagementZNode();
106         createCurrentTSOZNode();
107         startAsync();
108         awaitRunning();
109     }
110 
111     @Override
112     public void stopService() throws LeaseManagementException {
113         stopAsync();
114         awaitTerminated();
115     }
116 
117     @Override
118     public boolean stillInLeasePeriod() {
119         return System.currentTimeMillis() <= getEndLeaseInMs();
120     }
121 
122     // ----------------------------------------------------------------------------------------------------------------
123     // End LeaseManagement implementation
124     // ----------------------------------------------------------------------------------------------------------------
125 
126     void tryToGetInitialLeasePeriod() throws Exception {
127         baseTimeInMs.set(System.currentTimeMillis());
128         if (canAcquireLease()) {
129             endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
130             LOG.info("{} got the lease (Master) Ver. {}/End of lease: {}ms", tsoHostAndPort,
131                      leaseNodeVersion, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(endLeaseInMs));
132             tsoStateInitializer.submit(new Runnable() {
133                 // TSO State initialization
134                 @Override
135                 public void run() {
136                     try {
137                         TSOState newTSOState = stateManager.initialize();
138                         advertiseTSOServerInfoThroughZK(newTSOState.getEpoch());
139                         tsoChannelHandler.reconnect();
140                     } catch (Exception e) {
141                         Thread t = Thread.currentThread();
142                         t.getUncaughtExceptionHandler().uncaughtException(t, e);
143                     }
144                 }
145             });
146         } else {
147             tsoStateInitializer.submit(new Runnable() {
148                 // TSO State initialization
149                 @Override
150                 public void run() {
151                     // In case the TSO was paused close the connection
152                     tsoChannelHandler.closeConnection();
153                 }
154             });
155         }
156     }
157 
158     void tryToRenewLeasePeriod() throws Exception {
159         baseTimeInMs.set(System.currentTimeMillis());
160         if (canAcquireLease()) {
161             if (System.currentTimeMillis() > getEndLeaseInMs()) {
162                 endLeaseInMs.set(0L);
163                 panicker.panic(tsoHostAndPort + " expired lease! Master is committing suicide");
164             } else {
165                 endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
166                 LOG.trace("{} renewed lease: Version {}/End of lease at {}ms",
167                           tsoHostAndPort, leaseNodeVersion, endLeaseInMs);
168             }
169         } else {
170             endLeaseInMs.set(0L);
171             panicker.panic(tsoHostAndPort + " lease lost (Ver. " + leaseNodeVersion + ")! Other instance is Master. Committing suicide...");
172         }
173     }
174 
175     private boolean haveLease() {
176         return stillInLeasePeriod();
177     }
178 
179     private long getEndLeaseInMs() {
180         return endLeaseInMs.get();
181     }
182 
183     private boolean canAcquireLease() throws Exception {
184         try {
185             int previousLeaseNodeVersion = leaseNodeVersion;
186             final byte[] instanceInfo = tsoHostAndPort.getBytes(Charsets.UTF_8);
187             // Try to acquire the lease
188             Stat stat = zkClient.setData().withVersion(previousLeaseNodeVersion)
189                     .forPath(leasePath, instanceInfo);
190             leaseNodeVersion = stat.getVersion();
191             LOG.trace("{} got new lease version {}", tsoHostAndPort, leaseNodeVersion);
192         } catch (KeeperException.BadVersionException e) {
193             return false;
194         }
195         return true;
196     }
197 
198     // ----------------------------------------------------------------------------------------------------------------
199     // AbstractScheduledService implementation
200     // ----------------------------------------------------------------------------------------------------------------
201 
202     @Override
203     protected void startUp() {
204     }
205 
206     @Override
207     protected void shutDown() {
208         try {
209             tsoChannelHandler.close();
210             LOG.info("Channel handler closed");
211         } catch (IOException e) {
212             LOG.error("Error closing TSOChannelHandler", e);
213         }
214     }
215 
216     @Override
217     protected void runOneIteration() throws Exception {
218 
219         if (!haveLease()) {
220             tryToGetInitialLeasePeriod();
221         } else {
222             tryToRenewLeasePeriod();
223         }
224 
225     }
226 
227     @Override
228     protected Scheduler scheduler() {
229 
230         final long guardLeasePeriodInMs = leasePeriodInMs / 4;
231 
232         return new AbstractScheduledService.CustomScheduler() {
233 
234             @Override
235             protected Schedule getNextSchedule() throws Exception {
236                 if (!haveLease()) {
237                     // Get the current node version...
238                     Stat stat = zkClient.checkExists().forPath(leasePath);
239                     leaseNodeVersion = stat.getVersion();
240                     LOG.trace("{} will try to get lease (with Ver. {}) in {}ms", tsoHostAndPort, leaseNodeVersion,
241                               leasePeriodInMs);
242                     // ...and wait the lease period
243                     return new Schedule(leasePeriodInMs, TimeUnit.MILLISECONDS);
244                 } else {
245                     long waitTimeInMs = getEndLeaseInMs() - System.currentTimeMillis() - guardLeasePeriodInMs;
246                     LOG.trace("{} will try to renew lease (with Ver. {}) in {}ms", tsoHostAndPort,
247                               leaseNodeVersion, waitTimeInMs);
248                     return new Schedule(waitTimeInMs, TimeUnit.MILLISECONDS);
249                 }
250             }
251         };
252 
253     }
254 
255     // ----------------------------------------------------------------------------------------------------------------
256     // Helper methods
257     // ----------------------------------------------------------------------------------------------------------------
258 
259     @Override
260     public String toString() {
261         return tsoHostAndPort;
262     }
263 
264     private void createLeaseManagementZNode() throws LeaseManagementException {
265         try {
266             validateZKPath(leasePath);
267         } catch (Exception e) {
268             throw new LeaseManagementException("Error creating Lease Management ZNode", e);
269         }
270     }
271 
272     private void createCurrentTSOZNode() throws LeaseManagementException {
273         try {
274             validateZKPath(currentTSOPath);
275         } catch (Exception e) {
276             throw new LeaseManagementException("Error creating TSO ZNode", e);
277         }
278     }
279 
280     private void validateZKPath(String zkPath) throws Exception {
281         EnsurePath path = zkClient.newNamespaceAwareEnsurePath(zkPath);
282         path.ensure(zkClient.getZookeeperClient());
283         Stat stat = zkClient.checkExists().forPath(zkPath);
284         Preconditions.checkNotNull(stat);
285         LOG.info("Path {} ensured", path.getPath());
286     }
287 
288     private void advertiseTSOServerInfoThroughZK(long epoch) throws Exception {
289 
290         Stat previousTSOZNodeStat = new Stat();
291         byte[] previousTSOInfoAsBytes = zkClient.getData().storingStatIn(previousTSOZNodeStat).forPath(currentTSOPath);
292         if (previousTSOInfoAsBytes != null && !new String(previousTSOInfoAsBytes, Charsets.UTF_8).isEmpty()) {
293             String previousTSOInfo = new String(previousTSOInfoAsBytes, Charsets.UTF_8);
294             String[] previousTSOAndEpochArray = previousTSOInfo.split("#");
295             Preconditions.checkArgument(previousTSOAndEpochArray.length == 2, "Incorrect TSO Info found: ", previousTSOInfo);
296             long oldEpoch = Long.parseLong(previousTSOAndEpochArray[1]);
297             if (oldEpoch > epoch) {
298                 throw new LeaseManagementException("Another TSO replica was found " + previousTSOInfo);
299             }
300         }
301         String tsoInfoAsString = tsoHostAndPort + "#" + Long.toString(epoch);
302         byte[] tsoInfoAsBytes = tsoInfoAsString.getBytes(Charsets.UTF_8);
303         zkClient.setData().withVersion(previousTSOZNodeStat.getVersion()).forPath(currentTSOPath, tsoInfoAsBytes);
304         LOG.info("TSO instance {} (Epoch {}) advertised through ZK", tsoHostAndPort, epoch);
305 
306     }
307 
308 }