View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import com.lmax.disruptor.EventFactory;
23  import com.lmax.disruptor.EventHandler;
24  import com.lmax.disruptor.RingBuffer;
25  import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
26  import com.lmax.disruptor.TimeoutHandler;
27  import com.lmax.disruptor.dsl.Disruptor;
28  
29  import org.apache.omid.metrics.MetricsRegistry;
30  import org.apache.omid.tso.TSOStateManager.TSOState;
31  import org.jboss.netty.channel.Channel;
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  import java.io.IOException;
36  import java.util.Collection;
37  import java.util.HashMap;
38  import java.util.Iterator;
39  import java.util.Map;
40  import java.util.NoSuchElementException;
41  import java.util.concurrent.ExecutorService;
42  import java.util.concurrent.Executors;
43  import java.util.concurrent.ThreadFactory;
44  
45  import static com.lmax.disruptor.dsl.ProducerType.MULTI;
46  import static java.util.concurrent.TimeUnit.MILLISECONDS;
47  import static java.util.concurrent.TimeUnit.SECONDS;
48  import static org.apache.omid.tso.AbstractRequestProcessor.RequestEvent.EVENT_FACTORY;
49  
50  abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestProcessor.RequestEvent>, RequestProcessor, TimeoutHandler {
51  
52      private static final Logger LOG = LoggerFactory.getLogger(AbstractRequestProcessor.class);
53  
54      // Disruptor-related attributes
55      private final ExecutorService disruptorExec;
56      protected final Disruptor<RequestEvent> disruptor;
57      protected RingBuffer<RequestEvent> requestRing;
58  
59      private final TimestampOracle timestampOracle;
60      private final CommitHashMap hashmap;
61      private final Map<Long, Long> tableFences;
62      private final MetricsRegistry metrics;
63      private final LowWatermarkWriter lowWatermarkWriter;
64      private long lowWatermark = -1L;
65  
66      //Used to forward fence
67      private final ReplyProcessor replyProcessor;
68  
69      AbstractRequestProcessor(MetricsRegistry metrics,
70                               TimestampOracle timestampOracle,
71                               Panicker panicker,
72                               TSOServerConfig config,
73                               LowWatermarkWriter lowWatermarkWriter, ReplyProcessor replyProcessor)
74              throws IOException {
75  
76  
77          // ------------------------------------------------------------------------------------------------------------
78          // Disruptor initialization
79          // ------------------------------------------------------------------------------------------------------------
80  
81          TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
82  
83          ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
84          this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
85  
86          this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
87          disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
88          disruptor.handleEventsWith(this);
89  
90  
91          // ------------------------------------------------------------------------------------------------------------
92          // Attribute initialization
93          // ------------------------------------------------------------------------------------------------------------
94  
95          this.metrics = metrics;
96          this.timestampOracle = timestampOracle;
97          this.hashmap = new CommitHashMap(config.getConflictMapSize());
98          this.tableFences = new HashMap<Long, Long>();
99          this.lowWatermarkWriter = lowWatermarkWriter;
100 
101         this.replyProcessor = replyProcessor;
102 
103         LOG.info("RequestProcessor initialized");
104 
105     }
106 
107     /**
108      * This should be called when the TSO gets leadership
109      */
110     @Override
111     public void update(TSOState state) throws Exception {
112         LOG.info("Initializing RequestProcessor state...");
113         this.lowWatermark = state.getLowWatermark();
114         lowWatermarkWriter.persistLowWatermark(lowWatermark).get(); // Sync persist
115         LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
116     }
117 
118     @Override
119     public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
120 
121         switch (event.getType()) {
122             case TIMESTAMP:
123                 handleTimestamp(event);
124                 break;
125             case COMMIT:
126                 handleCommit(event);
127                 break;
128             case FENCE:
129                 handleFence(event);
130                 break;
131             default:
132                 throw new IllegalStateException("Event not allowed in Request Processor: " + event);
133         }
134 
135     }
136 
137     @Override
138     public void onTimeout(long sequence) throws Exception {
139 
140         // TODO We can not use this as a timeout trigger for flushing. This timeout is related to the time between
141         // TODO (cont) arrivals of requests to the disruptor. We need another mechanism to trigger timeouts
142         // TODO (cont) WARNING!!! Take care with the implementation because if there's other thread than request-0
143         // TODO (cont) thread the one that calls persistProc.triggerCurrentBatchFlush(); we'll incur in concurrency issues
144         // TODO (cont) This is because, in the current implementation, only the request-0 thread calls the public methods
145         // TODO (cont) in persistProc and it is guaranteed that access them serially.
146         onTimeout();
147     }
148 
149     @Override
150     public void timestampRequest(Channel c, MonitoringContext monCtx) {
151 
152         monCtx.timerStart("request.processor.timestamp.latency");
153         long seq = requestRing.next();
154         RequestEvent e = requestRing.get(seq);
155         RequestEvent.makeTimestampRequest(e, c, monCtx);
156         requestRing.publish(seq);
157 
158     }
159 
160     @Override
161     public void commitRequest(long startTimestamp, Collection<Long> writeSet, Collection<Long> tableIdSet, boolean isRetry, Channel c,
162                               MonitoringContext monCtx) {
163 
164         monCtx.timerStart("request.processor.commit.latency");
165         long seq = requestRing.next();
166         RequestEvent e = requestRing.get(seq);
167         RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, tableIdSet, isRetry, c);
168         requestRing.publish(seq);
169 
170     }
171 
172     @Override
173     public void fenceRequest(long tableID, Channel c, MonitoringContext monCtx) {
174 
175         monCtx.timerStart("request.processor.fence.latency");
176         long seq = requestRing.next();
177         RequestEvent e = requestRing.get(seq);
178         RequestEvent.makeFenceRequest(e, tableID, c, monCtx);
179         requestRing.publish(seq);
180 
181     }
182 
183     private void handleTimestamp(RequestEvent requestEvent) throws Exception {
184 
185         long timestamp = timestampOracle.next();
186         requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
187         forwardTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
188     }
189 
190     // Checks whether transaction transactionId started before a fence creation of a table transactionId modified.
191     private boolean hasConflictsWithFences(long startTimestamp, Collection<Long> tableIdSet) {
192         if (!tableFences.isEmpty()) {
193             for (long tableId: tableIdSet) {
194                 Long fence = tableFences.get(tableId);
195                 if (fence != null && fence > startTimestamp) {
196                     return true;
197                 }
198                 if (fence != null && fence < lowWatermark) {
199                     tableFences.remove(tableId); // Garbage collect entries of old fences.
200                 }
201             }
202         }
203 
204         return false;
205     }
206 
207  // Checks whether transactionId has a write-write conflict with a transaction committed after transactionId.
208     private boolean hasConflictsWithCommittedTransactions(long startTimestamp, Iterable<Long> writeSet) {
209         for (long cellId : writeSet) {
210             long value = hashmap.getLatestWriteForCell(cellId);
211             if (value != 0 && value >= startTimestamp) {
212                 return true;
213             }
214         }
215 
216         return false;
217     }
218 
219     private void handleCommit(RequestEvent event) throws Exception {
220 
221         long startTimestamp = event.getStartTimestamp();
222         Iterable<Long> writeSet = event.writeSet();
223         Collection<Long> tableIdSet = event.getTableIdSet();
224         boolean isCommitRetry = event.isCommitRetry();
225         Channel c = event.getChannel();
226 
227         boolean nonEmptyWriteSet = writeSet.iterator().hasNext();
228 
229         // If the transaction started before the low watermark, or
230         // it started before a fence and modified the table the fence created for, or
231         // it has a write-write conflict with a transaction committed after it started
232         // Then it should abort. Otherwise, it can commit.
233         if (startTimestamp > lowWatermark &&
234             !hasConflictsWithFences(startTimestamp, tableIdSet) &&
235             !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
236 
237             long commitTimestamp = timestampOracle.next();
238             Optional<Long> forwardNewWaterMark = Optional.absent();
239             if (nonEmptyWriteSet) {
240                 long newLowWatermark = lowWatermark;
241 
242                 for (long r : writeSet) {
243                     long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
244                     newLowWatermark = Math.max(removed, newLowWatermark);
245                 }
246 
247                 if (newLowWatermark != lowWatermark) {
248                     LOG.trace("Setting new low Watermark to {}", newLowWatermark);
249                     lowWatermark = newLowWatermark;
250                     forwardNewWaterMark = Optional.of(lowWatermark);
251                 }
252             }
253             event.getMonCtx().timerStop("request.processor.commit.latency");
254             forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark);
255 
256         } else {
257 
258             event.getMonCtx().timerStop("request.processor.commit.latency");
259             if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
260                 forwardCommitRetry(startTimestamp, c, event.getMonCtx());
261             } else {
262                 forwardAbort(startTimestamp, c, event.getMonCtx());
263             }
264 
265         }
266 
267     }
268 
269     private void handleFence(RequestEvent event) throws Exception {
270         long tableID = event.getTableId();
271         Channel c = event.getChannel();
272 
273         long fenceTimestamp = timestampOracle.next();
274 
275         tableFences.put(tableID, fenceTimestamp);
276 
277         event.monCtx.timerStart("reply.processor.fence.latency");
278         replyProcessor.sendFenceResponse(tableID, fenceTimestamp, c, event.monCtx);
279     }
280 
281     @Override
282     public void close() throws IOException {
283 
284         LOG.info("Terminating Request Processor...");
285         disruptor.halt();
286         disruptor.shutdown();
287         LOG.info("\tRequest Processor Disruptor shutdown");
288         disruptorExec.shutdownNow();
289         try {
290             disruptorExec.awaitTermination(3, SECONDS);
291             LOG.info("\tRequest Processor Disruptor executor shutdown");
292         } catch (InterruptedException e) {
293             LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
294             Thread.currentThread().interrupt();
295         }
296         LOG.info("Request Processor terminated");
297 
298     }
299 
300     protected abstract void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> lowWatermark) throws Exception;
301     protected abstract void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
302     protected abstract void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
303     protected abstract void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
304     protected abstract void onTimeout() throws Exception;
305 
306 
307 
308     final static class RequestEvent implements Iterable<Long> {
309 
310         enum Type {
311             TIMESTAMP, COMMIT, FENCE
312         }
313 
314         private Type type = null;
315         private Channel channel = null;
316 
317         private boolean isCommitRetry = false;
318         private long startTimestamp = 0;
319         private MonitoringContext monCtx;
320         private long numCells = 0;
321 
322         private static final int MAX_INLINE = 40;
323         private Long writeSet[] = new Long[MAX_INLINE];
324         private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
325 
326         private Collection<Long> tableIdSet = null;
327         private long tableID = 0;
328 
329         static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
330             e.type = Type.TIMESTAMP;
331             e.channel = c;
332             e.monCtx = monCtx;
333         }
334 
335         static void makeCommitRequest(RequestEvent e,
336                                       long startTimestamp,
337                                       MonitoringContext monCtx,
338                                       Collection<Long> writeSet,
339                                       Collection<Long> TableIdSet,
340                                       boolean isRetry,
341                                       Channel c) {
342             e.monCtx = monCtx;
343             e.type = Type.COMMIT;
344             e.channel = c;
345             e.startTimestamp = startTimestamp;
346             e.isCommitRetry = isRetry;
347             if (writeSet.size() > MAX_INLINE) {
348                 e.numCells = writeSet.size();
349                 e.writeSetAsCollection = writeSet;
350             } else {
351                 e.writeSetAsCollection = null;
352                 e.numCells = writeSet.size();
353                 int i = 0;
354                 for (Long cellId : writeSet) {
355                     e.writeSet[i] = cellId;
356                     ++i;
357                 }
358             }
359             e.tableIdSet = TableIdSet;
360         }
361 
362         static void makeFenceRequest(RequestEvent e,
363                                      long tableID,
364                                      Channel c,
365                                      MonitoringContext monCtx) {
366             e.type = Type.FENCE;
367             e.channel = c;
368             e.monCtx = monCtx;
369             e.tableID = tableID;
370         }
371 
372         MonitoringContext getMonCtx() {
373             return monCtx;
374         }
375 
376         Type getType() {
377             return type;
378         }
379 
380         long getStartTimestamp() {
381             return startTimestamp;
382         }
383 
384         Channel getChannel() {
385             return channel;
386         }
387 
388         Collection<Long> getTableIdSet() {
389             return tableIdSet;
390         }
391 
392         long getTableId() {
393             return tableID;
394         }
395 
396         @Override
397         public Iterator<Long> iterator() {
398 
399             if (writeSetAsCollection != null) {
400                 return writeSetAsCollection.iterator();
401             }
402 
403             return new Iterator<Long>() {
404                 int i = 0;
405 
406                 @Override
407                 public boolean hasNext() {
408                     return i < numCells;
409                 }
410 
411                 @Override
412                 public Long next() {
413                     if (!hasNext()) {
414                         throw new NoSuchElementException();
415                     }
416                     return writeSet[i++];
417                 }
418 
419                 @Override
420                 public void remove() {
421                     throw new UnsupportedOperationException();
422                 }
423             };
424 
425         }
426 
427         Iterable<Long> writeSet() {
428 
429             return this;
430 
431         }
432 
433         boolean isCommitRetry() {
434             return isCommitRetry;
435         }
436 
437         final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
438             @Override
439             public RequestEvent newInstance() {
440                 return new RequestEvent();
441             }
442         };
443 
444     }
445 
446 }