1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso.client;
19
20 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
21
22 import org.apache.omid.committable.CommitTable;
23
24 import java.io.IOException;
25 import java.util.HashSet;
26 import java.util.Set;
27 import java.util.Map;
28 import java.util.HashMap;
29 import java.util.concurrent.atomic.AtomicLong;
30
31
32 public class MockTSOClient implements TSOProtocol {
33
34 private static final int CONFLICT_MAP_SIZE = 1_000_000;
35
36 private final AtomicLong timestampGenerator = new AtomicLong();
37 private final long[] conflictMap = new long[CONFLICT_MAP_SIZE];
38 private final Map<Long, Long> fenceMap = new HashMap<Long, Long>();
39 private final AtomicLong lwm = new AtomicLong();
40
41 private final CommitTable.Writer commitTable;
42
43 public MockTSOClient(CommitTable.Writer commitTable) {
44 this.commitTable = commitTable;
45 }
46
47 @Override
48 public TSOFuture<Long> getNewStartTimestamp() {
49 synchronized (conflictMap) {
50 SettableFuture<Long> f = SettableFuture.create();
51 f.set(timestampGenerator.incrementAndGet());
52 return new ForwardingTSOFuture<>(f);
53 }
54 }
55
56 @Override
57 public TSOFuture<Long> getFence(long tableId) {
58 synchronized (conflictMap) {
59 SettableFuture<Long> f = SettableFuture.create();
60 long fenceTimestamp = timestampGenerator.incrementAndGet();
61 f.set(fenceTimestamp);
62 fenceMap.put(tableId, fenceTimestamp);
63 try {
64
65 commitTable.addCommittedTransaction(fenceTimestamp, fenceTimestamp);
66 commitTable.flush();
67 } catch (IOException ioe) {
68 f.setException(ioe);
69 }
70 return new ForwardingTSOFuture<>(f);
71 }
72 }
73
74
75 private boolean hasConflictsWithFences(long transactionId, Set<? extends CellId> cells) {
76 Set<Long> tableIDs = new HashSet<Long>();
77 for (CellId c : cells) {
78 tableIDs.add(c.getTableId());
79 }
80
81 if (! fenceMap.isEmpty()) {
82 for (long tableId : tableIDs) {
83 Long fence = fenceMap.get(tableId);
84 if (fence != null && transactionId < fence) {
85 return true;
86 }
87 if (fence != null && fence < lwm.get()) {
88 fenceMap.remove(tableId);
89 }
90 }
91 }
92
93 return false;
94 }
95
96
97 private boolean hasConflictsWithCommittedTransactions(long transactionId, Set<? extends CellId> cells) {
98 for (CellId c : cells) {
99 int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
100 if (conflictMap[index] >= transactionId) {
101 return true;
102 }
103 }
104
105 return false;
106 }
107
108 @Override
109 public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells, Set<? extends CellId> conflictFreeWriteSet) {
110 return commit(transactionId, cells);
111 }
112
113 @Override
114 public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
115 synchronized (conflictMap) {
116 SettableFuture<Long> f = SettableFuture.create();
117 if (transactionId < lwm.get()) {
118 f.setException(new AbortException());
119 return new ForwardingTSOFuture<>(f);
120 }
121
122 if (!hasConflictsWithFences(transactionId, cells) &&
123 !hasConflictsWithCommittedTransactions(transactionId, cells)) {
124
125 long commitTimestamp = timestampGenerator.incrementAndGet();
126 for (CellId c : cells) {
127 int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
128 long oldVal = conflictMap[index];
129 conflictMap[index] = commitTimestamp;
130 long curLwm = lwm.get();
131 while (oldVal > curLwm) {
132 if (lwm.compareAndSet(curLwm, oldVal)) {
133 break;
134 }
135 curLwm = lwm.get();
136 }
137 }
138
139 f.set(commitTimestamp);
140 try {
141 commitTable.addCommittedTransaction(transactionId, commitTimestamp);
142 commitTable.updateLowWatermark(lwm.get());
143 commitTable.flush();
144 } catch (IOException ioe) {
145 f.setException(ioe);
146 }
147 } else {
148 f.setException(new AbortException());
149 }
150 return new ForwardingTSOFuture<>(f);
151 }
152 }
153
154 @Override
155 public TSOFuture<Void> close() {
156 SettableFuture<Void> f = SettableFuture.create();
157 f.set(null);
158 return new ForwardingTSOFuture<>(f);
159 }
160
161 @Override
162 public boolean isLowLatency() {
163 return false;
164 }
165
166 @Override
167 public void setConflictDetectionLevel(OmidClientConfiguration.ConflictDetectionLevel conflictDetectionLevel) {
168
169 }
170
171 @Override
172 public OmidClientConfiguration.ConflictDetectionLevel getConflictDetectionLevel() {
173 return null;
174 }
175
176 @Override
177 public long getEpoch() {
178 return 0;
179 }
180 }