1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import com.lmax.disruptor.YieldingWaitStrategy;
22
23 import org.apache.commons.pool2.ObjectPool;
24 import org.apache.omid.committable.CommitTable;
25 import org.apache.omid.committable.CommitTable.CommitTimestamp;
26 import org.apache.omid.committable.InMemoryCommitTable;
27 import org.apache.omid.metrics.MetricsRegistry;
28 import org.jboss.netty.channel.Channel;
29 import org.mockito.ArgumentCaptor;
30 import org.mockito.Mock;
31 import org.mockito.MockitoAnnotations;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.testng.Assert;
35 import org.testng.annotations.BeforeMethod;
36 import org.testng.annotations.Test;
37
38 import static org.mockito.Matchers.any;
39 import static org.mockito.Mockito.timeout;
40 import static org.mockito.Mockito.verify;
41 import static org.testng.Assert.assertEquals;
42
43 public class TestRetryProcessor {
44
45 private static final Logger LOG = LoggerFactory.getLogger(TestRetryProcessor.class);
46
47 private static long NON_EXISTING_ST_TX = 1000;
48 private static long ST_TX_1 = 0;
49 private static long CT_TX_1 = 1;
50
51 @Mock
52 private Channel channel;
53 @Mock
54 private ReplyProcessor replyProc;
55 @Mock
56 private Panicker panicker;
57 @Mock
58 private MetricsRegistry metrics;
59 @Mock
60 private MonitoringContextImpl monCtx;
61
62 private CommitTable commitTable;
63
64 @BeforeMethod(alwaysRun = true, timeOut = 30_000)
65 public void initMocksAndComponents() {
66 MockitoAnnotations.initMocks(this);
67
68 commitTable = new InMemoryCommitTable();
69 }
70
71 @Test(timeOut = 10_000)
72 public void testRetriedRequestForANonExistingTxReturnsAbort() throws Exception {
73 ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
74
75
76 RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
77
78
79 retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, monCtx);
80 ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
81
82 verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
83 long startTS = firstTSCapture.getValue();
84 assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
85 }
86
87 @Test(timeOut = 10_000)
88 public void testRetriedRequestForAnExistingTxReturnsCommit() throws Exception {
89 ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
90
91
92 RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
93
94
95 commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
96 retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
97 ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
98 ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
99
100 verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
101 secondTSCapture.capture(),
102 any(Channel.class), any(MonitoringContextImpl.class), any(Optional.class));
103
104 long startTS = firstTSCapture.getValue();
105 long commitTS = secondTSCapture.getValue();
106 assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1");
107 assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1");
108
109 }
110
111 @Test(timeOut = 10_000)
112 public void testRetriedRequestForInvalidatedTransactionReturnsAnAbort() throws Exception {
113
114
115 commitTable.getClient().tryInvalidateTransaction(ST_TX_1);
116
117
118
119 Optional<CommitTimestamp> invalidTxMarker = commitTable.getClient().getCommitTimestamp(ST_TX_1).get();
120 Assert.assertTrue(invalidTxMarker.isPresent());
121 Assert.assertEquals(invalidTxMarker.get().getValue(), InMemoryCommitTable.INVALID_TRANSACTION_MARKER);
122
123 ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
124
125
126 RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
127
128
129 retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
130 ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
131 verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
132 long startTS = startTSCapture.getValue();
133 Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
134
135 }
136
137 }
138