View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
22  import org.apache.commons.pool2.ObjectPool;
23  import org.apache.omid.metrics.MetricsRegistry;
24  import org.apache.omid.metrics.NullMetricsProvider;
25  import org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent;
26  import org.jboss.netty.channel.Channel;
27  import org.mockito.InOrder;
28  import org.mockito.Mock;
29  import org.mockito.MockitoAnnotations;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  import org.testng.annotations.AfterMethod;
33  import org.testng.annotations.BeforeMethod;
34  import org.testng.annotations.Test;
35  
36  import com.lmax.disruptor.BlockingWaitStrategy;
37  
38  import static org.mockito.Matchers.any;
39  import static org.mockito.Matchers.eq;
40  import static org.mockito.Mockito.*;
41  import static org.testng.Assert.assertEquals;
42  import static org.testng.Assert.assertFalse;
43  import static org.testng.Assert.assertTrue;
44  import static org.testng.Assert.fail;
45  
46  public class TestReplyProcessor {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(TestReplyProcessor.class);
49  
50      private static final long ANY_DISRUPTOR_SEQUENCE = 1234L;
51  
52      public static final int BATCH_POOL_SIZE = 3;
53  
54      private static final long FIRST_ST = 0L;
55      private static final long FIRST_CT = 1L;
56      private static final long SECOND_ST = 2L;
57      private static final long SECOND_CT = 3L;
58      private static final long THIRD_ST = 4L;
59      private static final long THIRD_CT = 5L;
60      private static final long FOURTH_ST = 6L;
61      private static final long FOURTH_CT = 7L;
62      private static final long FIFTH_ST = 8L;
63      private static final long FIFTH_CT = 9L;
64      private static final long SIXTH_ST = 10L;
65      private static final long SIXTH_CT = 11L;
66  
67      @Mock
68      private Panicker panicker;
69  
70      @Mock
71      private MonitoringContextImpl monCtx;
72  
73      private MetricsRegistry metrics;
74  
75      private ObjectPool<Batch> batchPool;
76  
77      // Component under test
78      private ReplyProcessorImpl replyProcessor;
79      private LowWatermarkWriter lowWatermarkWriter;
80  
81      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
82      public void initMocksAndComponents() throws Exception {
83  
84          MockitoAnnotations.initMocks(this);
85  
86          TSOServerConfig tsoConfig = new TSOServerConfig();
87          tsoConfig.setNumConcurrentCTWriters(BATCH_POOL_SIZE);
88  
89          // Configure null metrics provider
90          metrics = new NullMetricsProvider();
91  
92          batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
93  
94  
95          lowWatermarkWriter = mock(LowWatermarkWriter.class);
96          SettableFuture<Void> f = SettableFuture.create();
97          f.set(null);
98          doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
99  
100         replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter));
101     }
102 
103     @AfterMethod
104     void afterMethod() {
105     }
106 
107     @Test(timeOut = 10_000)
108     public void testBadFormedPackageThrowsException() throws Exception {
109 
110         // We need an instance throwing exceptions for this test
111         replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool, lowWatermarkWriter));
112 
113         // Prepare test batch
114         Batch batch = batchPool.borrowObject();
115         batch.addCommitRetry(FIRST_ST, null, monCtx);
116         ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
117         ReplyBatchEvent.makeReplyBatch(e, batch, 0);
118 
119         assertEquals(replyProcessor.nextIDToHandle.get(), 0);
120         assertEquals(replyProcessor.futureEvents.size(), 0);
121         assertEquals(batchPool.getNumActive(), 1);
122         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
123 
124         try {
125             replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
126             fail();
127         } catch (RuntimeException re) {
128             // Expected
129         }
130 
131         assertEquals(replyProcessor.nextIDToHandle.get(), 0);
132         assertEquals(replyProcessor.futureEvents.size(), 0);
133         assertEquals(batchPool.getNumActive(), 1);
134         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
135 
136     }
137 
138     @Test(timeOut = 10_000)
139     public void testUnorderedBatchSequenceGetsSaved() throws Exception {
140 
141         final long HIGH_SEQUENCE_NUMBER = 1234L; // Should be greater than 0
142 
143         // Prepare test batch
144         Batch batch = batchPool.borrowObject();
145         ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
146         ReplyBatchEvent.makeReplyBatch(e, batch, HIGH_SEQUENCE_NUMBER);
147 
148         assertEquals(replyProcessor.nextIDToHandle.get(), 0);
149         assertEquals(replyProcessor.futureEvents.size(), 0);
150         assertEquals(batchPool.getNumActive(), 1);
151         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
152 
153         replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
154 
155         assertEquals(replyProcessor.nextIDToHandle.get(), 0);
156         assertEquals(replyProcessor.futureEvents.size(), 1);
157         assertEquals(batchPool.getNumActive(), 1);
158         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
159         assertTrue(batch.isEmpty());
160         verify(replyProcessor, times(0)).handleReplyBatchEvent(any(ReplyBatchEvent.class));
161 
162     }
163 
164     @Test(timeOut = 10_000)
165     public void testProcessingOfEmptyBatchReplyEvent() throws Exception {
166 
167         // Prepare test batch
168         Batch batch = batchPool.borrowObject();
169         ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
170         ReplyBatchEvent.makeReplyBatch(e, batch, 0);
171 
172         assertEquals(replyProcessor.nextIDToHandle.get(), 0);
173         assertEquals(replyProcessor.futureEvents.size(), 0);
174         assertEquals(batchPool.getNumActive(), 1);
175         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
176 
177         replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
178 
179         assertEquals(replyProcessor.nextIDToHandle.get(), 1);
180         assertEquals(replyProcessor.futureEvents.size(), 0);
181         assertEquals(batchPool.getNumActive(), 0);
182         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
183         assertTrue(batch.isEmpty());
184         verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(e));
185 
186     }
187 
188     @Test(timeOut = 10_000)
189     public void testUnorderedSequenceOfBatchReplyEventsThatMustBeOrderedBeforeSendingReplies() throws Exception {
190 
191         // Prepare 3 batches with events and simulate a different order of arrival using the batch sequence
192 
193         // Prepare first a delayed batch (Batch #3)
194         Batch thirdBatch = batchPool.borrowObject();
195         thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), monCtx);
196         thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
197         ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
198         ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 2); // Set a higher sequence than the initial one
199 
200         assertEquals(replyProcessor.nextIDToHandle.get(), 0);
201         assertEquals(replyProcessor.futureEvents.size(), 0);
202         assertEquals(batchPool.getNumActive(), 1);
203         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
204 
205         replyProcessor.onEvent(thirdBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
206 
207         assertEquals(replyProcessor.nextIDToHandle.get(), 0);
208         assertEquals(replyProcessor.futureEvents.size(), 1);
209         assertEquals(batchPool.getNumActive(), 1);
210         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
211         assertFalse(thirdBatch.isEmpty());
212         verify(replyProcessor, never()).handleReplyBatchEvent(eq(thirdBatchEvent));
213 
214         // Prepare another delayed batch (Batch #2)
215         Batch secondBatch = batchPool.borrowObject();
216         secondBatch.addTimestamp(THIRD_ST, mock(Channel.class), monCtx);
217         secondBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
218         ReplyBatchEvent secondBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
219         ReplyBatchEvent.makeReplyBatch(secondBatchEvent, secondBatch, 1); // Set another higher sequence
220 
221         replyProcessor.onEvent(secondBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
222 
223         assertEquals(replyProcessor.nextIDToHandle.get(), 0);
224         assertEquals(replyProcessor.futureEvents.size(), 2);
225         assertEquals(batchPool.getNumActive(), 2);
226         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 2);
227         assertFalse(secondBatch.isEmpty());
228         assertFalse(thirdBatch.isEmpty());
229 
230         // Finally, prepare the batch that should trigger the execution of the other two
231         Batch firstBatch = batchPool.borrowObject();
232         firstBatch.addAbort(FIFTH_ST, mock(Channel.class), monCtx);
233         ReplyBatchEvent firstBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
234         ReplyBatchEvent.makeReplyBatch(firstBatchEvent, firstBatch, 0); // Set the first batch with a higher sequence
235 
236         replyProcessor.onEvent(firstBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
237 
238         assertEquals(replyProcessor.nextIDToHandle.get(), 3);
239         assertEquals(replyProcessor.futureEvents.size(), 0);
240         assertEquals(batchPool.getNumActive(), 0);
241         assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
242         assertTrue(firstBatch.isEmpty());
243         assertTrue(secondBatch.isEmpty());
244         assertTrue(thirdBatch.isEmpty());
245 
246         // Check the method calls have been properly ordered
247 
248         InOrder inOrderReplyBatchEvents = inOrder(replyProcessor, replyProcessor, replyProcessor);
249         inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(firstBatchEvent));
250         inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(secondBatchEvent));
251         inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent));
252 
253         InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
254         inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx));
255         inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx));
256         inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx), any(Optional.class));
257         inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx));
258         inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx), any(Optional.class));
259 
260     }
261 
262     @Test
263     public void testUpdateLowWaterMarkOnlyForMaxInBatch() throws Exception {
264 
265         Batch thirdBatch = batchPool.borrowObject();
266         thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), monCtx);
267         thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx, Optional.of(100L));
268         thirdBatch.addCommit(THIRD_ST, THIRD_CT, mock(Channel.class), monCtx, Optional.of(50L));
269         thirdBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
270         thirdBatch.addCommit(FIFTH_ST, FIFTH_CT, mock(Channel.class), monCtx, Optional.of(100L));
271         thirdBatch.addCommit(SIXTH_ST, SIXTH_CT, mock(Channel.class), monCtx, Optional.of(150L));
272 
273         ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
274         ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 0);
275 
276         replyProcessor.onEvent(thirdBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
277 
278         InOrder inOrderWatermarkWriter = inOrder(lowWatermarkWriter, lowWatermarkWriter, lowWatermarkWriter);
279 
280         inOrderWatermarkWriter.verify(lowWatermarkWriter, times(1)).persistLowWatermark(eq(100L));
281         inOrderWatermarkWriter.verify(lowWatermarkWriter, times(1)).persistLowWatermark(eq(150L));
282 
283         verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(50L));
284 
285         InOrder inOrderCheckLWM = inOrder(replyProcessor, replyProcessor,replyProcessor,replyProcessor,replyProcessor);
286         inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(100L));
287         inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(50L));
288         inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.<Long>absent());
289         inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(100L));
290         inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(150L));
291 
292     }
293 
294 }