View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import org.apache.omid.committable.CommitTable;
22  import org.apache.omid.metrics.MetricsRegistry;
23  import org.apache.omid.metrics.NullMetricsProvider;
24  import org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent;
25  import org.jboss.netty.channel.Channel;
26  import org.mockito.Mock;
27  import org.mockito.Mockito;
28  import org.mockito.MockitoAnnotations;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.testng.annotations.AfterMethod;
32  import org.testng.annotations.BeforeMethod;
33  import org.testng.annotations.Test;
34  
35  import java.io.IOException;
36  
37  import static org.mockito.Matchers.any;
38  import static org.mockito.Matchers.anyLong;
39  import static org.mockito.Matchers.eq;
40  import static org.mockito.Mockito.doReturn;
41  import static org.mockito.Mockito.doThrow;
42  import static org.mockito.Mockito.mock;
43  import static org.mockito.Mockito.never;
44  import static org.mockito.Mockito.spy;
45  import static org.mockito.Mockito.times;
46  import static org.mockito.Mockito.verify;
47  import static org.testng.Assert.assertEquals;
48  import static org.testng.Assert.assertTrue;
49  import static org.testng.Assert.fail;
50  
51  public class TestPersistenceProcessorHandler {
52  
53      private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessorHandler.class);
54  
55      private static final int BATCH_ID = 0;
56      private static final int BATCH_SIZE = 6;
57      private static final long BATCH_SEQUENCE = 0;
58  
59      private static final long FIRST_ST = 0L;
60      private static final long FIRST_CT = 1L;
61      private static final long SECOND_ST = 2L;
62      private static final long SECOND_CT = 3L;
63      private static final long THIRD_ST = 4L;
64      private static final long THIRD_CT = 5L;
65      private static final long FOURTH_ST = 6L;
66      private static final long FOURTH_CT = 7L;
67      private static final long FIFTH_ST = 8L;
68      private static final long FIFTH_CT = 9L;
69      private static final long SIXTH_ST = 10L;
70  
71      @Mock
72      private CommitTable.Writer mockWriter;
73      @Mock
74      private CommitTable.Client mockClient;
75      @Mock
76      private LeaseManager leaseManager;
77      @Mock
78      private ReplyProcessor replyProcessor;
79      @Mock
80      private RetryProcessor retryProcessor;
81      @Mock
82      private Panicker panicker;
83  
84      private CommitTable commitTable;
85  
86      private MetricsRegistry metrics;
87  
88      // Component under test
89      private PersistenceProcessorHandler persistenceHandler;
90  
91      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
92      public void initMocksAndComponents() throws Exception {
93  
94          MockitoAnnotations.initMocks(this);
95  
96          // Configure null metrics provider
97          metrics = new NullMetricsProvider();
98  
99          // Configure commit table to return the mocked writer and client
100         commitTable = new CommitTable() {
101             @Override
102             public Writer getWriter() {
103                 return mockWriter;
104             }
105 
106             @Override
107             public Client getClient() {
108                 return mockClient;
109             }
110         };
111 
112         // Simulate we're master for most of the tests
113         doReturn(true).when(leaseManager).stillInLeasePeriod();
114 
115         persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
116                                                                  "localhost:1234",
117                                                                  leaseManager,
118                                                                  commitTable,
119                                                                  replyProcessor,
120                                                                  retryProcessor,
121                                                                  panicker));
122 
123     }
124 
125     @AfterMethod
126     void afterMethod() {
127         Mockito.reset(mockWriter);
128     }
129 
130     @Test(timeOut = 1_000)
131     public void testPersistentProcessorHandlerIdsAreCreatedConsecutive() throws Exception {
132 
133         TSOServerConfig tsoConfig = new TSOServerConfig();
134         tsoConfig.setNumConcurrentCTWriters(32);
135 
136         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
137         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
138             handlers[i] = new PersistenceProcessorHandler(metrics,
139                                                           "localhost:1234",
140                                                           mock(LeaseManager.class),
141                                                           commitTable,
142                                                           mock(ReplyProcessor.class),
143                                                           retryProcessor,
144                                                           panicker);
145         }
146 
147         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
148             // Required to generalize the cases when other tests have increased the static variable assigning the ids
149             if (i + 1 < tsoConfig.getNumConcurrentCTWriters()) {
150                 int followingHandlerIdAsInt = Integer.valueOf(handlers[i + 1].getId());
151                 assertEquals(handlers[i].getId(), String.valueOf(followingHandlerIdAsInt - 1));
152             } else { // Final case: compare with the last element that the sequence creator assigned
153                 int followingHandlerIdAsInt = PersistenceProcessorHandler.consecutiveSequenceCreator.get();
154                 assertEquals(handlers[i].getId(), String.valueOf(followingHandlerIdAsInt - 1));
155             }
156         }
157 
158     }
159 
160     @Test(timeOut = 10_000)
161     public void testProcessingOfEmptyBatchPersistEvent() throws Exception {
162 
163         // Prepare test batch
164         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
165         PersistBatchEvent batchEvent = new PersistBatchEvent();
166         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
167         persistenceHandler.onEvent(batchEvent);
168 
169         verify(persistenceHandler, times(1)).flush(eq(0));
170         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
171         verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
172         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
173         assertTrue(batch.isEmpty());
174 
175     }
176 
177     @Test(timeOut = 10_000)
178     public void testProcessingOfBatchPersistEventWithASingleTimestampEvent() throws Exception {
179 
180         // Prepare test batch
181         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
182         batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
183         PersistBatchEvent batchEvent = new PersistBatchEvent();
184         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
185         persistenceHandler.onEvent(batchEvent);
186 
187         verify(persistenceHandler, times(1)).flush(eq(0));
188         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
189         verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
190         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
191         assertEquals(batch.getNumEvents(), 1);
192         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
193 
194     }
195 
196     @Test(timeOut = 10_000)
197     public void testProcessingOfBatchPersistEventWithASingleCommitEvent() throws Exception {
198 
199         // Prepare test batch
200         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
201         batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
202         PersistBatchEvent batchEvent = new PersistBatchEvent();
203         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
204         persistenceHandler.onEvent(batchEvent);
205 
206         verify(persistenceHandler, times(1)).flush(eq(1));
207         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
208         verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
209         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
210         assertEquals(batch.getNumEvents(), 1);
211         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
212         assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
213 
214     }
215 
216     @Test(timeOut = 10_000)
217     public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
218 
219         // Prepare test batch
220         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
221         batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
222         PersistBatchEvent batchEvent = new PersistBatchEvent();
223         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
224         persistenceHandler.onEvent(batchEvent);
225 
226         verify(persistenceHandler, times(1)).flush(eq(0));
227         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
228         verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
229         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
230         assertEquals(batch.getNumEvents(), 1);
231         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
232 
233     }
234 
235     @Test(timeOut = 10_000)
236     public void testProcessingOfBatchPersistEventWithASingleCommitRetryEvent() throws Exception {
237 
238         // Prepare test batch
239         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
240         batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
241         PersistBatchEvent batchEvent = new PersistBatchEvent();
242         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
243 
244         // Call process method
245         persistenceHandler.onEvent(batchEvent);
246 
247         verify(persistenceHandler, times(1)).flush(eq(0));
248         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
249         verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
250         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
251         assertEquals(batch.getNumEvents(), 0);
252 
253     }
254 
255     @Test(timeOut = 10_000)
256     public void testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry() throws Exception {
257 
258         // Prepare test batch
259         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
260         batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
261         batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
262         PersistBatchEvent batchEvent = new PersistBatchEvent();
263         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
264 
265         // Initial assertion
266         assertEquals(batch.getNumEvents(), 2);
267 
268         // Call process method
269         persistenceHandler.onEvent(batchEvent);
270 
271         verify(persistenceHandler, times(1)).flush(eq(1));
272         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
273         verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
274         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
275         assertEquals(batch.getNumEvents(), 1);
276         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
277         assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
278 
279     }
280 
281     @Test(timeOut = 10_000)
282     public void testProcessingOfBatchPersistEventWith2EventsCommitRetryAndCommit() throws Exception {
283         // ------------------------------------------------------------------------------------------------------------
284         // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry but swapped events
285         // ------------------------------------------------------------------------------------------------------------
286 
287         // Prepare test batch
288         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
289         batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
290         batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
291         PersistBatchEvent batchEvent = new PersistBatchEvent();
292         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
293 
294         // Initial assertion
295         assertEquals(batch.getNumEvents(), 2);
296 
297         // Call process method
298         persistenceHandler.onEvent(batchEvent);
299 
300         verify(persistenceHandler, times(1)).flush(eq(1));
301         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
302         verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
303         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
304         assertEquals(batch.getNumEvents(), 1);
305         assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST);
306         assertEquals(batch.get(0).getCommitTimestamp(), SECOND_CT);
307 
308     }
309 
310     @Test(timeOut = 10_000)
311     public void testProcessingOfBatchPersistEventWith2CommitRetryEvents() throws Exception {
312 
313         // Prepare test batch
314         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
315         batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
316         batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
317         PersistBatchEvent batchEvent = new PersistBatchEvent();
318         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
319 
320         // Initial assertion
321         assertEquals(batch.getNumEvents(), 2);
322 
323         // Call process method
324         persistenceHandler.onEvent(batchEvent);
325 
326         verify(persistenceHandler, times(1)).flush(eq(0));
327         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
328         verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
329         verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
330         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
331         assertEquals(batch.getNumEvents(), 0);
332 
333     }
334 
335     @Test(timeOut = 10_000)
336     public void testProcessingOfBatchPersistEventWith2AbortEvents() throws Exception {
337 
338         // Prepare test batch
339         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
340         batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
341         batch.addAbort(SECOND_ST, null, mock(MonitoringContextImpl.class));
342         PersistBatchEvent batchEvent = new PersistBatchEvent();
343         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
344 
345         // Initial assertion
346         assertEquals(batch.getNumEvents(), 2);
347 
348         // Call process method
349         persistenceHandler.onEvent(batchEvent);
350 
351         verify(persistenceHandler, times(1)).flush(eq(0));
352         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
353         verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
354         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
355         assertEquals(batch.getNumEvents(), 2);
356         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
357         assertEquals(batch.get(1).getStartTimestamp(), SECOND_ST);
358 
359     }
360 
361 
362     @Test(timeOut = 10_000)
363     public void testProcessingOfBatchPersistEventWithMultipleRetryAndNonRetryEvents() throws Exception {
364 
365         // Prepare test batch
366         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
367 
368         batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
369         batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
370         batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
371         batch.addAbort(FOURTH_ST, null, mock(MonitoringContextImpl.class));
372         batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
373         batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContextImpl.class));
374         PersistBatchEvent batchEvent = new PersistBatchEvent();
375         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
376 
377         // Initial assertion
378         assertEquals(batch.getNumEvents(), 6);
379 
380         // Call process method
381         persistenceHandler.onEvent(batchEvent);
382 
383         verify(persistenceHandler, times(1)).flush(2); // 2 commits to flush
384         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
385         verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
386         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
387         assertEquals(batch.getNumEvents(), 4);
388         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
389         assertEquals(batch.get(1).getStartTimestamp(), FIFTH_ST);
390         assertEquals(batch.get(1).getCommitTimestamp(), FIFTH_CT);
391         assertEquals(batch.get(2).getStartTimestamp(), THIRD_ST);
392         assertEquals(batch.get(2).getCommitTimestamp(), THIRD_CT);
393         assertEquals(batch.get(3).getStartTimestamp(), FOURTH_ST);
394 
395     }
396 
397     @Test(timeOut = 10_000)
398     public void testPanicPersistingEvents() throws Exception {
399 
400         // User the real panicker
401         Panicker panicker = spy(new RuntimeExceptionPanicker());
402         persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
403                                                                  "localhost:1234",
404                                                                  leaseManager,
405                                                                  commitTable,
406                                                                  replyProcessor,
407                                                                  retryProcessor,
408                                                                  panicker));
409 
410         // Prepare test batch
411         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
412         batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
413         PersistBatchEvent batchEvent = new PersistBatchEvent();
414         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
415 
416         doThrow(IOException.class).when(mockWriter).flush();
417 
418         try {
419             persistenceHandler.onEvent(batchEvent);
420             fail();
421         } catch (RuntimeException re) {
422             // Expected
423         }
424 
425         verify(persistenceHandler, times(1)).flush(1);
426         verify(panicker, times(1)).panic(eq("Error persisting commit batch"), any(IOException.class));
427         verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
428         verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
429 
430     }
431 
432     @Test(timeOut = 10_000)
433     public void testPanicBecauseMasterLosesMastership() throws Exception {
434 
435         // ------------------------------------------------------------------------------------------------------------
436         // 1) Test panic before flushing
437         // ------------------------------------------------------------------------------------------------------------
438 
439         // Simulate we lose mastership BEFORE flushing
440         doReturn(false).when(leaseManager).stillInLeasePeriod();
441 
442         // User the real panicker
443         Panicker panicker = spy(new RuntimeExceptionPanicker());
444         persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
445                                                                  "localhost:1234",
446                                                                  leaseManager,
447                                                                  commitTable,
448                                                                  replyProcessor,
449                                                                  retryProcessor,
450                                                                  panicker));
451 
452         // Prepare test batch
453         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
454         batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
455         PersistBatchEvent batchEvent = new PersistBatchEvent();
456         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
457 
458         try {
459             persistenceHandler.onEvent(batchEvent);
460             fail();
461         } catch (RuntimeException re) {
462             // Expected
463         }
464         verify(persistenceHandler, times(1)).flush(eq(1));
465         verify(mockWriter, never()).flush();
466         verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
467         verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
468         verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
469 
470         // ------------------------------------------------------------------------------------------------------------
471         // 2) Test panic after flushing
472         // ------------------------------------------------------------------------------------------------------------
473 
474         // Simulate we lose mastership AFTER flushing
475         doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
476 
477         // User the real panicker
478         panicker = spy(new RuntimeExceptionPanicker());
479         persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
480                                                                  "localhost:1234",
481                                                                  leaseManager,
482                                                                  commitTable,
483                                                                  replyProcessor,
484                                                                  retryProcessor,
485                                                                  panicker));
486 
487         // Prepare test batch
488         batch = new Batch(BATCH_ID, BATCH_SIZE);
489         batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
490         batchEvent = new PersistBatchEvent();
491         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
492 
493         try {
494             persistenceHandler.onEvent(batchEvent);
495             fail();
496         } catch (RuntimeException re) {
497             // Expected
498         }
499         verify(persistenceHandler, times(1)).flush(eq(1));
500         verify(mockWriter, times(1)).flush();
501         verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
502         verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
503         verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
504 
505     }
506 
507 }