View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso;
19  
20  
21  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
22  import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
23  import com.google.inject.Guice;
24  import com.google.inject.Injector;
25  import com.google.inject.Module;
26  import org.apache.omid.TestUtils;
27  import org.apache.omid.committable.CommitTable;
28  import org.apache.omid.proto.TSOProto;
29  import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
30  import org.apache.omid.tso.client.CellId;
31  import org.apache.omid.tso.client.TSOClient;
32  import org.apache.omid.tso.client.TSOClientOneShot;
33  import org.apache.omid.tso.util.DummyCellIdImpl;
34  import org.testng.annotations.Test;
35  import org.apache.omid.tso.client.OmidClientConfiguration;
36  import org.mockito.Mock;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  import org.testng.annotations.AfterMethod;
40  import org.testng.annotations.BeforeMethod;
41  import java.util.Set;
42  import static org.testng.Assert.assertTrue;
43  
44  public class TestTSOLL {
45  
46      private static final Logger LOG = LoggerFactory.getLogger(TestTSOLL.class);
47  
48      private static final String TSO_SERVER_HOST = "localhost";
49      private static final int TSO_SERVER_PORT = 1234;
50  
51  
52      private OmidClientConfiguration tsoClientConf;
53  
54      // Required infrastructure for TSOClient test
55      private TSOServer tsoServer;
56      private PausableTimestampOracle pausableTSOracle;
57      private CommitTable commitTable;
58  
59      private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
60      private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
61  
62      private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
63  
64      @Mock
65      ReplyProcessor replyProcessor;
66  
67      @BeforeMethod
68      public void beforeMethod() throws Exception {
69  
70          TSOServerConfig tsoConfig = new TSOServerConfig();
71          tsoConfig.setLowLatency(true);
72          tsoConfig.setConflictMapSize(1000);
73          tsoConfig.setPort(TSO_SERVER_PORT);
74          tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
75          tsoConfig.setNumConcurrentCTWriters(2);
76          Module tsoServerMockModule = new TSOMockModule(tsoConfig);
77          Injector injector = Guice.createInjector(tsoServerMockModule);
78  
79          LOG.info("==================================================================================================");
80          LOG.info("======================================= Init TSO Server ==========================================");
81          LOG.info("==================================================================================================");
82  
83          tsoServer = injector.getInstance(TSOServer.class);
84          tsoServer.startAsync();
85          tsoServer.awaitRunning();
86          TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
87  
88          LOG.info("==================================================================================================");
89          LOG.info("===================================== TSO Server Initialized =====================================");
90          LOG.info("==================================================================================================");
91  
92          pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
93          commitTable = injector.getInstance(CommitTable.class);
94  
95          OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
96          tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
97  
98          this.tsoClientConf = tsoClientConf;
99          commitTable = injector.getInstance(CommitTable.class);
100         replyProcessor = injector.getInstance(ReplyProcessor.class);
101     }
102 
103     @AfterMethod
104     public void afterMethod() throws Exception {
105 
106 
107         tsoServer.stopAsync();
108         tsoServer.awaitTerminated();
109         tsoServer = null;
110         TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
111 
112         pausableTSOracle.resume();
113 
114     }
115 
116     @Test(timeOut = 30_000)
117     public void testNoWriteToCommitTable() throws Exception {
118 
119         TSOClient client = TSOClient.newInstance(tsoClientConf);
120         TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
121         long ts1 = client.getNewStartTimestamp().get();
122 
123         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
124         assertTrue(response1.getCommitResponse().hasCommitTimestamp());
125         Optional<CommitTable.CommitTimestamp> cts = commitTable.getClient().getCommitTimestamp(ts1).get();
126 
127         assertTrue(cts.isPresent() == false);
128     }
129 
130     private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
131         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
132         TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
133         commitBuilder.setStartTimestamp(ts);
134         commitBuilder.setIsRetry(retry);
135         for (CellId cell : writeSet) {
136             commitBuilder.addCellId(cell.getCellId());
137         }
138         return builder.setCommitRequest(commitBuilder.build()).build();
139     }
140 }