View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.tso.client;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
21  
22  import org.apache.omid.committable.CommitTable;
23  
24  import java.io.IOException;
25  import java.util.HashSet;
26  import java.util.Set;
27  import java.util.Map;
28  import java.util.HashMap;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  // TODO Would be nice to compile all util classes for testing to a separate package that clients could import for tests
32  public class MockTSOClient implements TSOProtocol {
33  
34      private static final int CONFLICT_MAP_SIZE = 1_000_000;
35  
36      private final AtomicLong timestampGenerator = new AtomicLong();
37      private final long[] conflictMap = new long[CONFLICT_MAP_SIZE];
38      private final Map<Long, Long> fenceMap = new HashMap<Long, Long>();
39      private final AtomicLong lwm = new AtomicLong();
40  
41      private final CommitTable.Writer commitTable;
42  
43      public MockTSOClient(CommitTable.Writer commitTable) {
44          this.commitTable = commitTable;
45      }
46  
47      @Override
48      public TSOFuture<Long> getNewStartTimestamp() {
49          synchronized (conflictMap) {
50              SettableFuture<Long> f = SettableFuture.create();
51              f.set(timestampGenerator.incrementAndGet());
52              return new ForwardingTSOFuture<>(f);
53          }
54      }
55  
56      @Override
57      public TSOFuture<Long> getFence(long tableId) {
58          synchronized (conflictMap) {
59              SettableFuture<Long> f = SettableFuture.create();
60              long fenceTimestamp = timestampGenerator.incrementAndGet();
61              f.set(fenceTimestamp);
62              fenceMap.put(tableId, fenceTimestamp);
63              try {
64                  // Persist the fence by using the fence identifier as both the start and commit timestamp.
65                  commitTable.addCommittedTransaction(fenceTimestamp, fenceTimestamp);
66                  commitTable.flush();
67              } catch (IOException ioe) {
68                  f.setException(ioe);
69              }
70              return new ForwardingTSOFuture<>(f);
71          }
72      }
73  
74      // Checks whether transaction transactionId started before a fence creation of a table transactionId modified.
75      private boolean hasConflictsWithFences(long transactionId, Set<? extends CellId> cells) {
76          Set<Long> tableIDs = new HashSet<Long>();
77          for (CellId c : cells) {
78              tableIDs.add(c.getTableId());
79          }
80  
81          if (! fenceMap.isEmpty()) {
82              for (long tableId : tableIDs) {
83                  Long fence = fenceMap.get(tableId);
84                  if (fence != null && transactionId < fence) {
85                      return true;
86                  }
87                  if (fence != null && fence < lwm.get()) { // GC
88                      fenceMap.remove(tableId);
89                  }
90              }
91          }
92  
93          return false;
94      }
95  
96      // Checks whether transactionId has a write-write conflict with a transaction committed after transactionId.
97      private boolean hasConflictsWithCommittedTransactions(long transactionId, Set<? extends CellId> cells) {
98          for (CellId c : cells) {
99              int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
100             if (conflictMap[index] >= transactionId) {
101                 return true;
102             }
103         }
104 
105         return false;
106     }
107 
108     @Override
109     public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells, Set<? extends CellId> conflictFreeWriteSet) {
110         return commit(transactionId, cells);
111     }
112 
113     @Override
114     public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
115         synchronized (conflictMap) {
116             SettableFuture<Long> f = SettableFuture.create();
117             if (transactionId < lwm.get()) {
118                 f.setException(new AbortException());
119                 return new ForwardingTSOFuture<>(f);
120             }
121 
122             if (!hasConflictsWithFences(transactionId, cells) &&
123                 !hasConflictsWithCommittedTransactions(transactionId, cells)) {
124 
125                 long commitTimestamp = timestampGenerator.incrementAndGet();
126                 for (CellId c : cells) {
127                     int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
128                     long oldVal = conflictMap[index];
129                     conflictMap[index] = commitTimestamp;
130                     long curLwm = lwm.get();
131                     while (oldVal > curLwm) {
132                         if (lwm.compareAndSet(curLwm, oldVal)) {
133                             break;
134                         }
135                         curLwm = lwm.get();
136                     }
137                 }
138 
139                 f.set(commitTimestamp);
140                 try {
141                     commitTable.addCommittedTransaction(transactionId, commitTimestamp);
142                     commitTable.updateLowWatermark(lwm.get());
143                     commitTable.flush();
144                 } catch (IOException ioe) {
145                     f.setException(ioe);
146                 }
147             } else {
148                 f.setException(new AbortException());
149             }
150             return new ForwardingTSOFuture<>(f);
151         }
152     }
153 
154     @Override
155     public TSOFuture<Void> close() {
156         SettableFuture<Void> f = SettableFuture.create();
157         f.set(null);
158         return new ForwardingTSOFuture<>(f);
159     }
160 
161     @Override
162     public boolean isLowLatency() {
163         return false;
164     }
165 
166     @Override
167     public void setConflictDetectionLevel(OmidClientConfiguration.ConflictDetectionLevel conflictDetectionLevel) {
168 
169     }
170 
171     @Override
172     public OmidClientConfiguration.ConflictDetectionLevel getConflictDetectionLevel() {
173         return null;
174     }
175 
176     @Override
177     public long getEpoch() {
178         return 0;
179     }
180 }