View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21  import com.lmax.disruptor.YieldingWaitStrategy;
22  
23  import org.apache.commons.pool2.ObjectPool;
24  import org.apache.omid.committable.CommitTable;
25  import org.apache.omid.committable.CommitTable.CommitTimestamp;
26  import org.apache.omid.committable.InMemoryCommitTable;
27  import org.apache.omid.metrics.MetricsRegistry;
28  import org.jboss.netty.channel.Channel;
29  import org.mockito.ArgumentCaptor;
30  import org.mockito.Mock;
31  import org.mockito.MockitoAnnotations;
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  import org.testng.Assert;
35  import org.testng.annotations.BeforeMethod;
36  import org.testng.annotations.Test;
37  
38  import static org.mockito.Matchers.any;
39  import static org.mockito.Mockito.timeout;
40  import static org.mockito.Mockito.verify;
41  import static org.testng.Assert.assertEquals;
42  
43  public class TestRetryProcessor {
44  
45      private static final Logger LOG = LoggerFactory.getLogger(TestRetryProcessor.class);
46  
47      private static long NON_EXISTING_ST_TX = 1000;
48      private static long ST_TX_1 = 0;
49      private static long CT_TX_1 = 1;
50  
51      @Mock
52      private Channel channel;
53      @Mock
54      private ReplyProcessor replyProc;
55      @Mock
56      private Panicker panicker;
57      @Mock
58      private MetricsRegistry metrics;
59      @Mock
60      private MonitoringContextImpl monCtx;
61  
62      private CommitTable commitTable;
63  
64      @BeforeMethod(alwaysRun = true, timeOut = 30_000)
65      public void initMocksAndComponents() {
66          MockitoAnnotations.initMocks(this);
67          // Init components
68          commitTable = new InMemoryCommitTable();
69      }
70  
71      @Test(timeOut = 10_000)
72      public void testRetriedRequestForANonExistingTxReturnsAbort() throws Exception {
73          ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
74  
75          // The element to test
76          RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
77  
78          // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
79          retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, monCtx);
80          ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
81  
82          verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
83          long startTS = firstTSCapture.getValue();
84          assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
85      }
86  
87      @Test(timeOut = 10_000)
88      public void testRetriedRequestForAnExistingTxReturnsCommit() throws Exception {
89          ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
90  
91          // The element to test
92          RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
93  
94          // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
95          commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
96          retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
97          ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
98          ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
99  
100         verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
101                                                                     secondTSCapture.capture(),
102                                                                     any(Channel.class), any(MonitoringContextImpl.class), any(Optional.class));
103 
104         long startTS = firstTSCapture.getValue();
105         long commitTS = secondTSCapture.getValue();
106         assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1");
107         assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1");
108 
109     }
110 
111     @Test(timeOut = 10_000)
112     public void testRetriedRequestForInvalidatedTransactionReturnsAnAbort() throws Exception {
113 
114         // Invalidate the transaction
115         commitTable.getClient().tryInvalidateTransaction(ST_TX_1);
116 
117         // Pre-start verification: Validate that the transaction is invalidated
118         // NOTE: This test should be in the a test class for InMemoryCommitTable
119         Optional<CommitTimestamp> invalidTxMarker = commitTable.getClient().getCommitTimestamp(ST_TX_1).get();
120         Assert.assertTrue(invalidTxMarker.isPresent());
121         Assert.assertEquals(invalidTxMarker.get().getValue(), InMemoryCommitTable.INVALID_TRANSACTION_MARKER);
122 
123         ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
124 
125         // The element to test
126         RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
127 
128         // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
129         retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
130         ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
131         verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
132         long startTS = startTSCapture.getValue();
133         Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
134 
135     }
136 
137 }
138