1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
22 import org.apache.commons.pool2.ObjectPool;
23 import org.apache.omid.metrics.MetricsRegistry;
24 import org.apache.omid.metrics.NullMetricsProvider;
25 import org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent;
26 import org.jboss.netty.channel.Channel;
27 import org.mockito.InOrder;
28 import org.mockito.Mock;
29 import org.mockito.MockitoAnnotations;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.testng.annotations.AfterMethod;
33 import org.testng.annotations.BeforeMethod;
34 import org.testng.annotations.Test;
35
36 import com.lmax.disruptor.BlockingWaitStrategy;
37
38 import static org.mockito.Matchers.any;
39 import static org.mockito.Matchers.eq;
40 import static org.mockito.Mockito.*;
41 import static org.testng.Assert.assertEquals;
42 import static org.testng.Assert.assertFalse;
43 import static org.testng.Assert.assertTrue;
44 import static org.testng.Assert.fail;
45
46 public class TestReplyProcessor {
47
48 private static final Logger LOG = LoggerFactory.getLogger(TestReplyProcessor.class);
49
50 private static final long ANY_DISRUPTOR_SEQUENCE = 1234L;
51
52 public static final int BATCH_POOL_SIZE = 3;
53
54 private static final long FIRST_ST = 0L;
55 private static final long FIRST_CT = 1L;
56 private static final long SECOND_ST = 2L;
57 private static final long SECOND_CT = 3L;
58 private static final long THIRD_ST = 4L;
59 private static final long THIRD_CT = 5L;
60 private static final long FOURTH_ST = 6L;
61 private static final long FOURTH_CT = 7L;
62 private static final long FIFTH_ST = 8L;
63 private static final long FIFTH_CT = 9L;
64 private static final long SIXTH_ST = 10L;
65 private static final long SIXTH_CT = 11L;
66
67 @Mock
68 private Panicker panicker;
69
70 @Mock
71 private MonitoringContextImpl monCtx;
72
73 private MetricsRegistry metrics;
74
75 private ObjectPool<Batch> batchPool;
76
77
78 private ReplyProcessorImpl replyProcessor;
79 private LowWatermarkWriter lowWatermarkWriter;
80
81 @BeforeMethod(alwaysRun = true, timeOut = 30_000)
82 public void initMocksAndComponents() throws Exception {
83
84 MockitoAnnotations.initMocks(this);
85
86 TSOServerConfig tsoConfig = new TSOServerConfig();
87 tsoConfig.setNumConcurrentCTWriters(BATCH_POOL_SIZE);
88
89
90 metrics = new NullMetricsProvider();
91
92 batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
93
94
95 lowWatermarkWriter = mock(LowWatermarkWriter.class);
96 SettableFuture<Void> f = SettableFuture.create();
97 f.set(null);
98 doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
99
100 replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter));
101 }
102
103 @AfterMethod
104 void afterMethod() {
105 }
106
107 @Test(timeOut = 10_000)
108 public void testBadFormedPackageThrowsException() throws Exception {
109
110
111 replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool, lowWatermarkWriter));
112
113
114 Batch batch = batchPool.borrowObject();
115 batch.addCommitRetry(FIRST_ST, null, monCtx);
116 ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
117 ReplyBatchEvent.makeReplyBatch(e, batch, 0);
118
119 assertEquals(replyProcessor.nextIDToHandle.get(), 0);
120 assertEquals(replyProcessor.futureEvents.size(), 0);
121 assertEquals(batchPool.getNumActive(), 1);
122 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
123
124 try {
125 replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
126 fail();
127 } catch (RuntimeException re) {
128
129 }
130
131 assertEquals(replyProcessor.nextIDToHandle.get(), 0);
132 assertEquals(replyProcessor.futureEvents.size(), 0);
133 assertEquals(batchPool.getNumActive(), 1);
134 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
135
136 }
137
138 @Test(timeOut = 10_000)
139 public void testUnorderedBatchSequenceGetsSaved() throws Exception {
140
141 final long HIGH_SEQUENCE_NUMBER = 1234L;
142
143
144 Batch batch = batchPool.borrowObject();
145 ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
146 ReplyBatchEvent.makeReplyBatch(e, batch, HIGH_SEQUENCE_NUMBER);
147
148 assertEquals(replyProcessor.nextIDToHandle.get(), 0);
149 assertEquals(replyProcessor.futureEvents.size(), 0);
150 assertEquals(batchPool.getNumActive(), 1);
151 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
152
153 replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
154
155 assertEquals(replyProcessor.nextIDToHandle.get(), 0);
156 assertEquals(replyProcessor.futureEvents.size(), 1);
157 assertEquals(batchPool.getNumActive(), 1);
158 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
159 assertTrue(batch.isEmpty());
160 verify(replyProcessor, times(0)).handleReplyBatchEvent(any(ReplyBatchEvent.class));
161
162 }
163
164 @Test(timeOut = 10_000)
165 public void testProcessingOfEmptyBatchReplyEvent() throws Exception {
166
167
168 Batch batch = batchPool.borrowObject();
169 ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
170 ReplyBatchEvent.makeReplyBatch(e, batch, 0);
171
172 assertEquals(replyProcessor.nextIDToHandle.get(), 0);
173 assertEquals(replyProcessor.futureEvents.size(), 0);
174 assertEquals(batchPool.getNumActive(), 1);
175 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
176
177 replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
178
179 assertEquals(replyProcessor.nextIDToHandle.get(), 1);
180 assertEquals(replyProcessor.futureEvents.size(), 0);
181 assertEquals(batchPool.getNumActive(), 0);
182 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
183 assertTrue(batch.isEmpty());
184 verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(e));
185
186 }
187
188 @Test(timeOut = 10_000)
189 public void testUnorderedSequenceOfBatchReplyEventsThatMustBeOrderedBeforeSendingReplies() throws Exception {
190
191
192
193
194 Batch thirdBatch = batchPool.borrowObject();
195 thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), monCtx);
196 thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
197 ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
198 ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 2);
199
200 assertEquals(replyProcessor.nextIDToHandle.get(), 0);
201 assertEquals(replyProcessor.futureEvents.size(), 0);
202 assertEquals(batchPool.getNumActive(), 1);
203 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
204
205 replyProcessor.onEvent(thirdBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
206
207 assertEquals(replyProcessor.nextIDToHandle.get(), 0);
208 assertEquals(replyProcessor.futureEvents.size(), 1);
209 assertEquals(batchPool.getNumActive(), 1);
210 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
211 assertFalse(thirdBatch.isEmpty());
212 verify(replyProcessor, never()).handleReplyBatchEvent(eq(thirdBatchEvent));
213
214
215 Batch secondBatch = batchPool.borrowObject();
216 secondBatch.addTimestamp(THIRD_ST, mock(Channel.class), monCtx);
217 secondBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
218 ReplyBatchEvent secondBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
219 ReplyBatchEvent.makeReplyBatch(secondBatchEvent, secondBatch, 1);
220
221 replyProcessor.onEvent(secondBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
222
223 assertEquals(replyProcessor.nextIDToHandle.get(), 0);
224 assertEquals(replyProcessor.futureEvents.size(), 2);
225 assertEquals(batchPool.getNumActive(), 2);
226 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 2);
227 assertFalse(secondBatch.isEmpty());
228 assertFalse(thirdBatch.isEmpty());
229
230
231 Batch firstBatch = batchPool.borrowObject();
232 firstBatch.addAbort(FIFTH_ST, mock(Channel.class), monCtx);
233 ReplyBatchEvent firstBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
234 ReplyBatchEvent.makeReplyBatch(firstBatchEvent, firstBatch, 0);
235
236 replyProcessor.onEvent(firstBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
237
238 assertEquals(replyProcessor.nextIDToHandle.get(), 3);
239 assertEquals(replyProcessor.futureEvents.size(), 0);
240 assertEquals(batchPool.getNumActive(), 0);
241 assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
242 assertTrue(firstBatch.isEmpty());
243 assertTrue(secondBatch.isEmpty());
244 assertTrue(thirdBatch.isEmpty());
245
246
247
248 InOrder inOrderReplyBatchEvents = inOrder(replyProcessor, replyProcessor, replyProcessor);
249 inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(firstBatchEvent));
250 inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(secondBatchEvent));
251 inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent));
252
253 InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
254 inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx));
255 inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx));
256 inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx), any(Optional.class));
257 inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx));
258 inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx), any(Optional.class));
259
260 }
261
262 @Test
263 public void testUpdateLowWaterMarkOnlyForMaxInBatch() throws Exception {
264
265 Batch thirdBatch = batchPool.borrowObject();
266 thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), monCtx);
267 thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx, Optional.of(100L));
268 thirdBatch.addCommit(THIRD_ST, THIRD_CT, mock(Channel.class), monCtx, Optional.of(50L));
269 thirdBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
270 thirdBatch.addCommit(FIFTH_ST, FIFTH_CT, mock(Channel.class), monCtx, Optional.of(100L));
271 thirdBatch.addCommit(SIXTH_ST, SIXTH_CT, mock(Channel.class), monCtx, Optional.of(150L));
272
273 ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
274 ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 0);
275
276 replyProcessor.onEvent(thirdBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
277
278 InOrder inOrderWatermarkWriter = inOrder(lowWatermarkWriter, lowWatermarkWriter, lowWatermarkWriter);
279
280 inOrderWatermarkWriter.verify(lowWatermarkWriter, times(1)).persistLowWatermark(eq(100L));
281 inOrderWatermarkWriter.verify(lowWatermarkWriter, times(1)).persistLowWatermark(eq(150L));
282
283 verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(50L));
284
285 InOrder inOrderCheckLWM = inOrder(replyProcessor, replyProcessor,replyProcessor,replyProcessor,replyProcessor);
286 inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(100L));
287 inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(50L));
288 inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.<Long>absent());
289 inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(100L));
290 inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(150L));
291
292 }
293
294 }