1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20
21 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
22 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
23 import com.google.inject.Guice;
24 import com.google.inject.Injector;
25 import com.google.inject.Module;
26 import org.apache.omid.TestUtils;
27 import org.apache.omid.committable.CommitTable;
28 import org.apache.omid.proto.TSOProto;
29 import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
30 import org.apache.omid.tso.client.CellId;
31 import org.apache.omid.tso.client.TSOClient;
32 import org.apache.omid.tso.client.TSOClientOneShot;
33 import org.apache.omid.tso.util.DummyCellIdImpl;
34 import org.testng.annotations.Test;
35 import org.apache.omid.tso.client.OmidClientConfiguration;
36 import org.mockito.Mock;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.testng.annotations.AfterMethod;
40 import org.testng.annotations.BeforeMethod;
41 import java.util.Set;
42 import static org.testng.Assert.assertTrue;
43
44 public class TestTSOLL {
45
46 private static final Logger LOG = LoggerFactory.getLogger(TestTSOLL.class);
47
48 private static final String TSO_SERVER_HOST = "localhost";
49 private static final int TSO_SERVER_PORT = 1234;
50
51
52 private OmidClientConfiguration tsoClientConf;
53
54
55 private TSOServer tsoServer;
56 private PausableTimestampOracle pausableTSOracle;
57 private CommitTable commitTable;
58
59 private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
60 private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
61
62 private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
63
64 @Mock
65 ReplyProcessor replyProcessor;
66
67 @BeforeMethod
68 public void beforeMethod() throws Exception {
69
70 TSOServerConfig tsoConfig = new TSOServerConfig();
71 tsoConfig.setLowLatency(true);
72 tsoConfig.setConflictMapSize(1000);
73 tsoConfig.setPort(TSO_SERVER_PORT);
74 tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
75 tsoConfig.setNumConcurrentCTWriters(2);
76 Module tsoServerMockModule = new TSOMockModule(tsoConfig);
77 Injector injector = Guice.createInjector(tsoServerMockModule);
78
79 LOG.info("==================================================================================================");
80 LOG.info("======================================= Init TSO Server ==========================================");
81 LOG.info("==================================================================================================");
82
83 tsoServer = injector.getInstance(TSOServer.class);
84 tsoServer.startAsync();
85 tsoServer.awaitRunning();
86 TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
87
88 LOG.info("==================================================================================================");
89 LOG.info("===================================== TSO Server Initialized =====================================");
90 LOG.info("==================================================================================================");
91
92 pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
93 commitTable = injector.getInstance(CommitTable.class);
94
95 OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
96 tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
97
98 this.tsoClientConf = tsoClientConf;
99 commitTable = injector.getInstance(CommitTable.class);
100 replyProcessor = injector.getInstance(ReplyProcessor.class);
101 }
102
103 @AfterMethod
104 public void afterMethod() throws Exception {
105
106
107 tsoServer.stopAsync();
108 tsoServer.awaitTerminated();
109 tsoServer = null;
110 TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
111
112 pausableTSOracle.resume();
113
114 }
115
116 @Test(timeOut = 30_000)
117 public void testNoWriteToCommitTable() throws Exception {
118
119 TSOClient client = TSOClient.newInstance(tsoClientConf);
120 TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
121 long ts1 = client.getNewStartTimestamp().get();
122
123 TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
124 assertTrue(response1.getCommitResponse().hasCommitTimestamp());
125 Optional<CommitTable.CommitTimestamp> cts = commitTable.getClient().getCommitTimestamp(ts1).get();
126
127 assertTrue(cts.isPresent() == false);
128 }
129
130 private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
131 TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
132 TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
133 commitBuilder.setStartTimestamp(ts);
134 commitBuilder.setIsRetry(retry);
135 for (CellId cell : writeSet) {
136 commitBuilder.addCellId(cell.getCellId());
137 }
138 return builder.setCommitRequest(commitBuilder.build()).build();
139 }
140 }