1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import org.apache.commons.pool2.ObjectPool;
22 import org.apache.omid.committable.CommitTable;
23 import org.apache.omid.metrics.MetricsRegistry;
24 import org.apache.omid.timestamp.storage.TimestampStorage;
25 import org.mockito.Mock;
26 import org.mockito.Mockito;
27 import org.mockito.MockitoAnnotations;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.testng.annotations.AfterMethod;
31 import org.testng.annotations.BeforeMethod;
32 import org.testng.annotations.Test;
33
34 import com.lmax.disruptor.BlockingWaitStrategy;
35
36 import java.io.IOException;
37
38 import static org.mockito.Matchers.any;
39 import static org.mockito.Matchers.anyLong;
40 import static org.mockito.Matchers.anyString;
41 import static org.mockito.Mockito.doReturn;
42 import static org.mockito.Mockito.doThrow;
43 import static org.mockito.Mockito.mock;
44 import static org.mockito.Mockito.spy;
45 import static org.mockito.Mockito.timeout;
46 import static org.mockito.Mockito.verify;
47
48 public class TestPanicker {
49
50 private static final Logger LOG = LoggerFactory.getLogger(TestPanicker.class);
51
52 @Mock
53 private CommitTable.Writer mockWriter;
54 @Mock
55 private MetricsRegistry metrics;
56
57 @BeforeMethod
58 public void initMocksAndComponents() {
59 MockitoAnnotations.initMocks(this);
60 }
61
62 @AfterMethod
63 void afterMethod() {
64 Mockito.reset(mockWriter);
65 }
66
67
68
69
70 @Test(timeOut = 10_000)
71 public void testTimestampOraclePanic() throws Exception {
72
73 TimestampStorage storage = spy(new TimestampOracleImpl.InMemoryTimestampStorage());
74 Panicker panicker = spy(new MockPanicker());
75
76 doThrow(new RuntimeException("Out of memory")).when(storage).updateMaxTimestamp(anyLong(), anyLong());
77
78 final TimestampOracleImpl tso = new TimestampOracleImpl(metrics, storage, panicker);
79 tso.initialize();
80 Thread allocThread = new Thread("AllocThread") {
81 @Override
82 public void run() {
83 while (true) {
84 tso.next();
85 }
86 }
87 };
88 allocThread.start();
89
90 verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
91
92 }
93
94
95
96
97 @Test(timeOut = 10_000)
98 public void testCommitTablePanic() throws Exception {
99
100 Panicker panicker = spy(new MockPanicker());
101
102 doThrow(new IOException("Unable to write@TestPanicker")).when(mockWriter).flush();
103
104 final CommitTable.Client mockClient = mock(CommitTable.Client.class);
105 CommitTable commitTable = new CommitTable() {
106 @Override
107 public Writer getWriter() {
108 return mockWriter;
109 }
110
111 @Override
112 public Client getClient() {
113 return mockClient;
114 }
115 };
116
117 LeaseManager leaseManager = mock(LeaseManager.class);
118 doReturn(true).when(leaseManager).stillInLeasePeriod();
119 TSOServerConfig config = new TSOServerConfig();
120 ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
121
122 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
123 for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
124 handlers[i] = new PersistenceProcessorHandler(metrics,
125 "localhost:1234",
126 leaseManager,
127 commitTable,
128 mock(ReplyProcessor.class),
129 mock(RetryProcessor.class),
130 panicker);
131 }
132
133 PersistenceProcessor proc = new PersistenceProcessorImpl(config,
134 new BlockingWaitStrategy(),
135 commitTable,
136 batchPool,
137 panicker,
138 handlers,
139 metrics);
140
141 proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics), Optional.<Long>absent());
142
143 LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
144
145 new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker,
146 mock(TSOServerConfig.class), lowWatermarkWriter, mock(ReplyProcessor.class));
147
148 verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
149
150 }
151
152
153
154
155 @Test(timeOut = 10_000)
156 public void testRuntimeExceptionTakesDownDaemon() throws Exception {
157
158 Panicker panicker = spy(new MockPanicker());
159
160 final CommitTable.Writer mockWriter = mock(CommitTable.Writer.class);
161 doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
162
163 final CommitTable.Client mockClient = mock(CommitTable.Client.class);
164 CommitTable commitTable = new CommitTable() {
165 @Override
166 public Writer getWriter() {
167 return mockWriter;
168 }
169
170 @Override
171 public Client getClient() {
172 return mockClient;
173 }
174 };
175 TSOServerConfig config = new TSOServerConfig();
176 ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
177
178 PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
179 for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
180 handlers[i] = new PersistenceProcessorHandler(metrics,
181 "localhost:1234",
182 mock(LeaseManager.class),
183 commitTable,
184 mock(ReplyProcessor.class),
185 mock(RetryProcessor.class),
186 panicker);
187 }
188
189 PersistenceProcessor proc = new PersistenceProcessorImpl(config,
190 new BlockingWaitStrategy(),
191 commitTable,
192 batchPool,
193 panicker,
194 handlers,
195 metrics);
196 proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics), Optional.<Long>absent());
197
198 LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
199
200 new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class),
201 lowWatermarkWriter, mock(ReplyProcessor.class));
202
203 verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
204
205 }
206
207 }