1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import com.google.inject.name.Named;
23 import com.lmax.disruptor.EventFactory;
24 import com.lmax.disruptor.EventHandler;
25 import com.lmax.disruptor.RingBuffer;
26 import com.lmax.disruptor.WaitStrategy;
27 import com.lmax.disruptor.dsl.Disruptor;
28
29 import org.apache.commons.pool2.ObjectPool;
30 import org.apache.omid.committable.CommitTable;
31 import org.apache.omid.committable.CommitTable.CommitTimestamp;
32 import org.apache.omid.metrics.Meter;
33 import org.apache.omid.metrics.MetricsRegistry;
34 import org.jboss.netty.channel.Channel;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import javax.inject.Inject;
39
40 import java.io.IOException;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.ThreadFactory;
45
46 import static com.codahale.metrics.MetricRegistry.name;
47 import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
48 import static java.util.concurrent.TimeUnit.SECONDS;
49 import static org.apache.omid.tso.RetryProcessorImpl.RetryEvent.EVENT_FACTORY;
50
51
52
53
54
55 class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>, RetryProcessor {
56
57 private static final Logger LOG = LoggerFactory.getLogger(RetryProcessor.class);
58
59
60 private final ExecutorService disruptorExec;
61 private final Disruptor<RetryEvent> disruptor;
62 private final RingBuffer<RetryEvent> retryRing;
63
64 final ReplyProcessor replyProc;
65
66 final CommitTable.Client commitTableClient;
67 final ObjectPool<Batch> batchPool;
68
69
70 private final Meter txAlreadyCommittedMeter;
71 private final Meter invalidTxMeter;
72 private final Meter noCTFoundMeter;
73
74 @Inject
75 RetryProcessorImpl(@Named("RetryStrategy") WaitStrategy strategy,
76 MetricsRegistry metrics,
77 CommitTable commitTable,
78 ReplyProcessor replyProc,
79 Panicker panicker,
80 ObjectPool<Batch> batchPool)
81 throws InterruptedException, ExecutionException, IOException {
82
83
84
85
86
87 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
88 this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
89
90 this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, SINGLE, strategy);
91 disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
92 disruptor.handleEventsWith(this);
93 this.retryRing = disruptor.start();
94
95
96
97
98
99 this.commitTableClient = commitTable.getClient();
100 this.replyProc = replyProc;
101 this.batchPool = batchPool;
102
103
104 this.txAlreadyCommittedMeter = metrics.meter(name("tso", "retries", "commits", "tx-already-committed"));
105 this.invalidTxMeter = metrics.meter(name("tso", "retries", "aborts", "tx-invalid"));
106 this.noCTFoundMeter = metrics.meter(name("tso", "retries", "aborts", "tx-without-commit-timestamp"));
107
108 LOG.info("RetryProcessor initialized");
109
110 }
111
112 @Override
113 public void onEvent(final RetryEvent event, final long sequence, final boolean endOfBatch) throws Exception {
114
115 switch (event.getType()) {
116 case COMMIT:
117 handleCommitRetry(event);
118 event.getMonCtx().timerStop("retry.processor.commit-retry.latency");
119 break;
120 default:
121 assert (false);
122 break;
123 }
124 event.getMonCtx().publish();
125
126 }
127
128 private void handleCommitRetry(RetryEvent event) {
129
130 long startTimestamp = event.getStartTimestamp();
131 try {
132 Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
133 if (commitTimestamp.isPresent()) {
134 if (commitTimestamp.get().isValid()) {
135 LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
136 replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx(), Optional.<Long>absent());
137 txAlreadyCommittedMeter.mark();
138 } else {
139 LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", startTimestamp);
140 replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
141 invalidTxMeter.mark();
142 }
143 } else {
144 LOG.trace("Tx {}: No Commit TS found in Commit Table. Sending Abort to client.", startTimestamp);
145 replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
146 noCTFoundMeter.mark();
147 }
148 } catch (InterruptedException e) {
149 LOG.error("Interrupted reading from commit table");
150 Thread.currentThread().interrupt();
151 } catch (ExecutionException e) {
152 LOG.error("Error reading from commit table", e);
153 }
154
155 }
156
157 @Override
158 public void disambiguateRetryRequestHeuristically(long startTimestamp, Channel c, MonitoringContext monCtx) {
159 long seq = retryRing.next();
160 RetryEvent e = retryRing.get(seq);
161 monCtx.timerStart("retry.processor.commit-retry.latency");
162 RetryEvent.makeCommitRetry(e, startTimestamp, c, monCtx);
163 retryRing.publish(seq);
164 }
165
166 @Override
167 public void close() throws IOException {
168
169 LOG.info("Terminating Retry Processor...");
170 disruptor.halt();
171 disruptor.shutdown();
172 LOG.info("\tRetry Processor Disruptor shutdown");
173 disruptorExec.shutdownNow();
174 try {
175 disruptorExec.awaitTermination(3, SECONDS);
176 LOG.info("\tRetry Processor Disruptor executor shutdown");
177 } catch (InterruptedException e) {
178 LOG.error("Interrupted whilst finishing Retry Processor Disruptor executor");
179 Thread.currentThread().interrupt();
180 }
181 LOG.info("Retry Processor terminated");
182
183 }
184
185 public final static class RetryEvent {
186
187 enum Type {
188 COMMIT
189 }
190
191 private Type type = null;
192
193 private long startTimestamp = 0;
194 private Channel channel = null;
195 private MonitoringContext monCtx;
196
197 static void makeCommitRetry(RetryEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
198 e.monCtx = monCtx;
199 e.type = Type.COMMIT;
200 e.startTimestamp = startTimestamp;
201 e.channel = c;
202 }
203
204 MonitoringContext getMonCtx() {
205 return monCtx;
206 }
207
208 Type getType() {
209 return type;
210 }
211
212 Channel getChannel() {
213 return channel;
214 }
215
216 long getStartTimestamp() {
217 return startTimestamp;
218 }
219
220 public final static EventFactory<RetryEvent> EVENT_FACTORY = new EventFactory<RetryEvent>() {
221 @Override
222 public RetryEvent newInstance() {
223 return new RetryEvent();
224 }
225 };
226
227 }
228
229 }