1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.benchmarks.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.RateLimiter;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import com.google.inject.Binder;
23 import com.google.inject.Guice;
24 import com.google.inject.Injector;
25 import com.google.inject.Module;
26 import org.apache.omid.benchmarks.utils.IntegerGenerator;
27 import org.apache.omid.committable.CommitTable;
28 import org.apache.omid.metrics.Counter;
29 import org.apache.omid.metrics.MetricsRegistry;
30 import org.apache.omid.metrics.Timer;
31 import org.apache.omid.tso.util.DummyCellIdImpl;
32 import org.apache.omid.tso.client.AbortException;
33 import org.apache.omid.tso.client.CellId;
34 import org.apache.omid.tso.client.OmidClientConfiguration;
35 import org.apache.omid.tso.client.TSOClient;
36 import org.apache.omid.tso.client.TSOFuture;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import java.io.IOException;
41 import java.net.InetAddress;
42 import java.util.ArrayList;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Random;
46 import java.util.Set;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.ScheduledExecutorService;
50 import java.util.concurrent.TimeUnit;
51
52 import static com.codahale.metrics.MetricRegistry.name;
53
54 class RawTxRunner implements Runnable {
55
56 private static final Logger LOG = LoggerFactory.getLogger(RawTxRunner.class);
57
58 private static volatile int txRunnerCounter = 0;
59 private int txRunnerId = txRunnerCounter++;
60
61
62 private final int writesetSize;
63 private final boolean fixedWriteSetSize;
64 private final long commitDelayInMs;
65 private final int percentageOfReadOnlyTxs;
66 private final IntegerGenerator cellIdGenerator;
67 private final Random randomGen;
68
69
70 private final TSOClient tsoClient;
71 private final CommitTable.Client commitTableClient;
72
73
74 private final ScheduledExecutorService callbackExec =
75 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
76 .setNameFormat("tx-runner-" + txRunnerId + "-callback")
77 .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
78 @Override
79 public void uncaughtException(Thread t, Throwable e) {
80 LOG.error("Thread {} threw exception", t, e);
81 }
82 }).build());
83
84
85 private final Timer timestampTimer;
86 private final Timer commitTimer;
87 private final Timer abortTimer;
88 private final Counter errorCounter;
89
90
91 private final RateLimiter rateLimiter;
92
93
94 private volatile boolean isRunning = false;
95
96 RawTxRunner(final TSOServerBenchmarkConfig expConfig) throws IOException, InterruptedException {
97
98
99 List<Module> guiceModules = new ArrayList<>();
100 guiceModules.add(new Module() {
101 @Override
102 public void configure(Binder binder) {
103 binder.bind(MetricsRegistry.class).toInstance(expConfig.getMetrics());
104 }
105 });
106 guiceModules.add(expConfig.getCommitTableStoreModule());
107 Injector injector = Guice.createInjector(guiceModules);
108
109
110 this.writesetSize = expConfig.getWritesetSize();
111 this.fixedWriteSetSize = expConfig.isFixedWritesetSize();
112 this.commitDelayInMs = expConfig.getCommitDelayInMs();
113 this.percentageOfReadOnlyTxs = expConfig.getPercentageOfReadOnlyTxs();
114 this.cellIdGenerator = expConfig.getCellIdGenerator();
115 this.randomGen = new Random(System.currentTimeMillis() * txRunnerId);
116
117 int txRateInReqPerSec = expConfig.getTxRateInRequestPerSecond();
118 long warmUpPeriodInSecs = expConfig.getWarmUpPeriodInSecs();
119
120 LOG.info("TxRunner-{} [ Tx Rate (Req per Sec) -> {} ]", txRunnerId, txRateInReqPerSec);
121 LOG.info("TxRunner-{} [ Warm Up Period -> {} Secs ]", txRunnerId, warmUpPeriodInSecs);
122 LOG.info("TxRunner-{} [ Cell Id Distribution Generator -> {} ]", txRunnerId, expConfig.getCellIdGenerator().getClass());
123 LOG.info("TxRunner-{} [ Max Tx Size -> {} Fixed: {} ]", txRunnerId, writesetSize, fixedWriteSetSize);
124 LOG.info("TxRunner-{} [ Commit delay -> {} Ms ]", txRunnerId, commitDelayInMs);
125 LOG.info("TxRunner-{} [ % of Read-Only Tx -> {} % ]", txRunnerId, percentageOfReadOnlyTxs);
126
127
128 CommitTable commitTable = injector.getInstance(CommitTable.class);
129 this.commitTableClient = commitTable.getClient();
130
131
132 MetricsRegistry metrics = injector.getInstance(MetricsRegistry.class);
133 String hostName = InetAddress.getLocalHost().getHostName();
134 this.timestampTimer = metrics.timer(name("tx_runner", Integer.toString(txRunnerId), hostName, "timestamp"));
135 this.commitTimer = metrics.timer(name("tx_runner", Integer.toString(txRunnerId), hostName, "commit"));
136 this.abortTimer = metrics.timer(name("tx_runner", Integer.toString(txRunnerId), hostName, "abort"));
137 this.errorCounter = metrics.counter(name("tx_runner", Integer.toString(txRunnerId), hostName, "errors"));
138 LOG.info("TxRunner-{} [ Metrics provider module -> {} ]", txRunnerId, expConfig.getMetrics().getClass());
139
140
141 OmidClientConfiguration tsoClientConf = expConfig.getOmidClientConfiguration();
142 this.tsoClient = TSOClient.newInstance(tsoClientConf);
143 LOG.info("TxRunner-{} [ Connection Type {}/Connection String {} ]", txRunnerId,
144 tsoClientConf.getConnectionType(), tsoClientConf.getConnectionString());
145
146
147 this.rateLimiter = RateLimiter.create((double) txRateInReqPerSec, warmUpPeriodInSecs, TimeUnit.SECONDS);
148 }
149
150 @Override
151 public void run() {
152
153 isRunning = true;
154
155 while (isRunning) {
156 rateLimiter.acquire();
157 long tsRequestTime = System.nanoTime();
158 final TSOFuture<Long> tsFuture = tsoClient.getNewStartTimestamp();
159 tsFuture.addListener(new TimestampListener(tsFuture, tsRequestTime), callbackExec);
160 }
161
162 shutdown();
163
164 }
165
166 void stop() {
167 isRunning = false;
168 }
169
170 private void shutdown() {
171
172 try {
173 LOG.info("Finishing TxRunner in 3 secs", txRunnerId);
174 boolean wasSuccess = callbackExec.awaitTermination(3, TimeUnit.SECONDS);
175 if (!wasSuccess) {
176 callbackExec.shutdownNow();
177 }
178 tsoClient.close().get();
179 } catch (InterruptedException e) {
180 Thread.currentThread().interrupt();
181
182 } catch (ExecutionException e) {
183
184 } finally {
185 LOG.info("TxRunner {} finished", txRunnerId);
186 }
187
188 }
189
190 private class TimestampListener implements Runnable {
191
192 final TSOFuture<Long> tsFuture;
193 final long tsRequestTime;
194
195 TimestampListener(TSOFuture<Long> tsFuture, long tsRequestTime) {
196 this.tsFuture = tsFuture;
197 this.tsRequestTime = tsRequestTime;
198 }
199
200 @Override
201 public void run() {
202
203 try {
204 long txId = tsFuture.get();
205 timestampTimer.update(System.nanoTime() - tsRequestTime);
206 if (commitDelayInMs <= 0) {
207 callbackExec.execute(new Committer(txId));
208 } else {
209 callbackExec.schedule(new Committer(txId), commitDelayInMs, TimeUnit.MILLISECONDS);
210 }
211 } catch (InterruptedException e) {
212 Thread.currentThread().interrupt();
213 errorCounter.inc();
214 } catch (ExecutionException e) {
215 errorCounter.inc();
216 }
217
218 }
219
220 }
221
222 private class Committer implements Runnable {
223
224 final long txId;
225
226 Committer(long txId) {
227 this.txId = txId;
228 }
229
230 @Override
231 public void run() {
232
233 int txWritesetSize = calculateTxWritesetSize();
234
235 if (txWritesetSize == 0) {
236 return;
237 }
238
239 final Set<CellId> cells = new HashSet<>();
240 for (byte i = 0; i < txWritesetSize; i++) {
241 long cellId = cellIdGenerator.nextInt();
242 cells.add(new DummyCellIdImpl(cellId));
243 }
244
245 long startCommitTimeInNs = System.nanoTime();
246 final TSOFuture<Long> commitFuture = tsoClient.commit(txId, cells);
247 commitFuture.addListener(new CommitListener(txId, commitFuture, startCommitTimeInNs), callbackExec);
248
249 }
250
251 private int calculateTxWritesetSize() {
252 int txSize = 0;
253 boolean readOnly = (randomGen.nextFloat() * 100) < percentageOfReadOnlyTxs;
254 if (!readOnly) {
255 if (fixedWriteSetSize) {
256 txSize = writesetSize;
257 } else {
258 txSize = randomGen.nextInt(writesetSize) + 1;
259 }
260 }
261 return txSize;
262 }
263
264 }
265
266 private class CommitListener implements Runnable {
267
268 final long txId;
269 final long commitRequestTime;
270 final TSOFuture<Long> commitFuture;
271
272 CommitListener(long txId, TSOFuture<Long> commitFuture, long commitRequestTime) {
273 this.txId = txId;
274 this.commitFuture = commitFuture;
275 this.commitRequestTime = commitRequestTime;
276 }
277
278 @Override
279 public void run() {
280
281 try {
282 commitFuture.get();
283 commitTableClient.deleteCommitEntry(txId).get();
284 commitTimer.update(System.nanoTime() - commitRequestTime);
285 } catch (InterruptedException e) {
286 Thread.currentThread().interrupt();
287 errorCounter.inc();
288 } catch (ExecutionException e) {
289 if (e.getCause() instanceof AbortException) {
290 abortTimer.update(System.nanoTime() - commitRequestTime);
291 } else {
292 errorCounter.inc();
293 }
294 }
295
296 }
297
298 }
299
300 }