1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso.client;
19
20 import org.apache.omid.tso.ProgrammableTSOServer;
21 import org.apache.omid.tso.ProgrammableTSOServer.AbortResponse;
22 import org.apache.omid.tso.ProgrammableTSOServer.CommitResponse;
23 import org.apache.omid.tso.ProgrammableTSOServer.TimestampResponse;
24 import org.testng.annotations.BeforeClass;
25 import org.testng.annotations.BeforeMethod;
26 import org.testng.annotations.Test;
27
28 import java.io.IOException;
29 import java.util.Collections;
30 import java.util.concurrent.ExecutionException;
31
32 import static org.testng.Assert.assertEquals;
33
34 public class TestTSOClientResponseHandling {
35
36 private static final int TSO_PORT = 4321;
37 private static final long START_TS = 1L;
38 private static final long COMMIT_TS = 2L;
39
40 private ProgrammableTSOServer tsoServer = new ProgrammableTSOServer(TSO_PORT);
41
42 private TSOClient tsoClient;
43
44 @BeforeClass
45 public void configureAndCreateClient() throws IOException, InterruptedException {
46
47 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
48 tsoClientConf.setConnectionString("localhost:" + TSO_PORT);
49 tsoClient = TSOClient.newInstance(tsoClientConf);
50 }
51
52 @BeforeMethod
53 public void reset() {
54 tsoServer.cleanResponses();
55 }
56
57 @Test(timeOut = 10_000)
58 public void testTimestampRequestReceivingASuccessfulResponse() throws Exception {
59
60
61
62 tsoServer.queueResponse(new TimestampResponse(START_TS));
63
64 long startTS = tsoClient.getNewStartTimestamp().get();
65 assertEquals(startTS, START_TS);
66 }
67
68 @Test(timeOut = 10_000)
69 public void testCommitRequestReceivingAnAbortResponse() throws Exception {
70
71
72
73
74
75 tsoServer.queueResponse(new AbortResponse(START_TS));
76
77 try {
78 tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
79 } catch (ExecutionException ee) {
80 assertEquals(ee.getCause().getClass(), AbortException.class);
81 }
82 }
83
84 @Test(timeOut = 10_000)
85 public void testCommitRequestReceivingASuccessfulResponse() throws Exception {
86
87
88
89
90 tsoServer.queueResponse(new CommitResponse(START_TS, COMMIT_TS));
91
92 long commitTS = tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get();
93 assertEquals(commitTS, COMMIT_TS);
94 }
95
96 }