1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21 import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
22 import com.lmax.disruptor.WorkHandler;
23 import org.apache.omid.committable.CommitTable;
24 import org.apache.omid.metrics.Histogram;
25 import org.apache.omid.metrics.MetricsRegistry;
26 import org.apache.omid.metrics.Timer;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import javax.inject.Inject;
31 import java.io.IOException;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import static com.codahale.metrics.MetricRegistry.name;
36 import static org.apache.omid.tso.PersistEvent.Type.COMMIT_RETRY;
37
38
39 public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
40
41 private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorHandler.class);
42
43 @VisibleForTesting
44 static final AtomicInteger consecutiveSequenceCreator = new AtomicInteger(0);
45
46 private final String id;
47
48 private final String tsoHostAndPort;
49 private final LeaseManagement leaseManager;
50
51 private final ReplyProcessor replyProcessor;
52 private final RetryProcessor retryProcessor;
53 private final CommitTable.Writer writer;
54 final Panicker panicker;
55
56
57 private final Timer flushTimer;
58 private final Histogram batchSizeHistogram;
59 private final Histogram flushedCommitEventsHistogram;
60
61 @Inject
62 PersistenceProcessorHandler(MetricsRegistry metrics,
63 String tsoHostAndPort,
64 LeaseManagement leaseManager,
65 CommitTable commitTable,
66 ReplyProcessor replyProcessor,
67 RetryProcessor retryProcessor,
68 Panicker panicker)
69 throws InterruptedException, ExecutionException, IOException {
70
71 this.id = String.valueOf(consecutiveSequenceCreator.getAndIncrement());
72 this.tsoHostAndPort = tsoHostAndPort;
73 this.leaseManager = leaseManager;
74 this.writer = commitTable.getWriter();
75 this.replyProcessor = replyProcessor;
76 this.retryProcessor = retryProcessor;
77 this.panicker = panicker;
78
79
80 String flushTimerName = name("tso", "persistence-processor-handler", id, "flush", "latency");
81 flushTimer = metrics.timer(flushTimerName);
82 String flushedCommitEventsName = name("tso", "persistence-processor-handler", id, "flushed", "commits", "size");
83 flushedCommitEventsHistogram = metrics.histogram(flushedCommitEventsName);
84 String batchSizeMetricsName = name("tso", "persistence-processor-handler", id, "batch", "size");
85 batchSizeHistogram = metrics.histogram(batchSizeMetricsName);
86
87 }
88
89 public String getId() {
90 return id;
91 }
92
93 @Override
94 public void onEvent(PersistenceProcessorImpl.PersistBatchEvent batchEvent) throws Exception {
95
96 int commitEventsToFlush = 0;
97 Batch batch = batchEvent.getBatch();
98 int numOfBatchedEvents = batch.getNumEvents();
99 batchSizeHistogram.update(numOfBatchedEvents);
100 for (int i=0; i < numOfBatchedEvents; i++) {
101 PersistEvent event = batch.get(i);
102 switch (event.getType()) {
103 case TIMESTAMP:
104 event.getMonCtx().timerStop("persistence.processor.timestamp.latency");
105 break;
106 case COMMIT:
107 writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
108 commitEventsToFlush++;
109 break;
110 case COMMIT_RETRY:
111 event.getMonCtx().timerStop("persistence.processor.commit-retry.latency");
112 break;
113 case ABORT:
114 event.getMonCtx().timerStop("persistence.processor.abort.latency");
115 break;
116 case FENCE:
117
118 writer.addCommittedTransaction(event.getCommitTimestamp(), event.getCommitTimestamp());
119 commitEventsToFlush++;
120 break;
121 default:
122 throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
123 }
124 }
125
126
127
128 flush(commitEventsToFlush);
129 filterAndDissambiguateClientRetries(batch);
130 for (int i=0; i < batch.getNumEvents(); i++) {
131 PersistEvent event = batch.get(i);
132 switch (event.getType()) {
133 case TIMESTAMP:
134 event.getMonCtx().timerStart("reply.processor.timestamp.latency");
135 break;
136 case COMMIT:
137 event.getMonCtx().timerStop("persistence.processor.commit.latency");
138 event.getMonCtx().timerStart("reply.processor.commit.latency");
139 break;
140 case COMMIT_RETRY:
141 throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
142 case ABORT:
143 event.getMonCtx().timerStart("reply.processor.abort.latency");
144 break;
145 case FENCE:
146 event.getMonCtx().timerStop("persistence.processor.fence.latency");
147 event.getMonCtx().timerStart("reply.processor.fence.latency");
148 break;
149 default:
150 throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
151 }
152 }
153 replyProcessor.manageResponsesBatch(batchEvent.getBatchSequence(), batch);
154
155 }
156
157 void flush(int commitEventsToFlush) {
158
159 commitSuicideIfNotMaster();
160 try {
161 long startFlushTimeInNs = System.nanoTime();
162 if(commitEventsToFlush > 0) {
163 writer.flush();
164 }
165 flushTimer.update(System.nanoTime() - startFlushTimeInNs);
166 flushedCommitEventsHistogram.update(commitEventsToFlush);
167 } catch (IOException e) {
168 panicker.panic("Error persisting commit batch", e);
169 }
170 commitSuicideIfNotMaster();
171
172 }
173
174 private void commitSuicideIfNotMaster() {
175 if (!leaseManager.stillInLeasePeriod()) {
176 panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
177 }
178 }
179
180 void filterAndDissambiguateClientRetries(Batch batch) {
181
182 int currentEventIdx = 0;
183 while (currentEventIdx <= batch.getLastEventIdx()) {
184 PersistEvent event = batch.get(currentEventIdx);
185 if (event.getType() == COMMIT_RETRY) {
186 retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
187
188 swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());
189 batch.decreaseNumEvents();
190 if (batch.isEmpty()) {
191 break;
192 } else {
193 continue;
194 }
195 } else {
196 currentEventIdx++;
197 }
198 }
199
200 }
201
202 private void swapBatchElements(Batch batch, int firstIdx, int lastIdx) {
203 PersistEvent tmpEvent = batch.get(firstIdx);
204 PersistEvent lastEventInBatch = batch.get(lastIdx);
205 batch.set(firstIdx, lastEventInBatch);
206 batch.set(lastIdx, tmpEvent);
207 }
208
209 @Override
210 public String toString() {
211
212 return MoreObjects.toStringHelper(this).add("id", id).toString();
213
214 }
215
216 }