1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.omid.transaction;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21
22 import org.apache.omid.committable.CommitTable;
23 import org.apache.omid.tso.client.CellId;
24
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30
31 /**
32 * Omid's base abstract implementation of the {@link Transaction} interface.
33 * Provides extra methods to access other basic transaction state required by
34 * {@link TransactionManager} implementations based on snapshot
35 * isolation.
36 *
37 * So, this abstract class must be extended by particular implementations of
38 * transaction managers related to different storage systems (HBase...)
39 */
40 public abstract class AbstractTransaction<T extends CellId> implements Transaction {
41
42 public enum VisibilityLevel {
43 // Regular snapshot isolation. Returns the last key, either from the snapshot or from the current transaction
44 // Sets the readTimestamp to be the writeTimestamp
45 SNAPSHOT,
46 // Returns all the written version of a key X that written by the transaction and the key X from the provided snapshot.
47 SNAPSHOT_ALL,
48 // Returns the last key, either from the snapshot or from the current transaction that was written before the last checkpoint.
49 // Sets the readTimestamp to be the writeTimestamp - 1
50 SNAPSHOT_EXCLUDE_CURRENT;
51
52 public static VisibilityLevel fromInteger(int number) {
53 VisibilityLevel visibilityLevel = SNAPSHOT;
54
55 switch (number) {
56 case 0:
57 visibilityLevel = VisibilityLevel.SNAPSHOT;
58 break;
59 case 1:
60 visibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
61 break;
62 case 2:
63 visibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
64 break;
65 default:
66 assert(false);
67 }
68
69 return visibilityLevel;
70 }
71 }
72
73 private transient Map<String, Object> metadata = new HashMap<>();
74 private final AbstractTransactionManager transactionManager;
75 private final long startTimestamp;
76 protected long readTimestamp;
77 protected long writeTimestamp;
78 private final long epoch;
79 private long commitTimestamp;
80 private boolean isRollbackOnly;
81 private final Set<T> writeSet;
82 private final Set<T> conflictFreeWriteSet;
83 private Status status = Status.RUNNING;
84 private VisibilityLevel visibilityLevel;
85 private final boolean isLowLatency;
86
87 /**
88 * Base constructor
89 *
90 * @param transactionId
91 * transaction identifier to assign
92 * @param epoch
93 * epoch of the TSOServer instance that created this transaction
94 * Used in High Availability to guarantee data consistency
95 * @param writeSet
96 * initial write set for the transaction.
97 * Should be empty in most cases.
98 * @param conflictFreeWriteSet
99 * initial conflict free write set for the transaction.
100 * Should be empty in most cases.
101 * @param transactionManager
102 * transaction manager associated to this transaction.
103 * Usually, should be the one that created the transaction
104 * instance.
105 */
106 public AbstractTransaction(long transactionId,
107 long epoch,
108 Set<T> writeSet,
109 Set<T> conflictFreeWriteSet,
110 AbstractTransactionManager transactionManager,
111 boolean isLowLatency) {
112 this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, conflictFreeWriteSet,
113 transactionManager, isLowLatency);
114 }
115
116 public AbstractTransaction(long transactionId,
117 long readTimestamp,
118 VisibilityLevel visibilityLevel,
119 long epoch,
120 Set<T> writeSet,
121 Set<T> conflictFreeWriteSet,
122 AbstractTransactionManager transactionManager,
123 boolean isLowLatency) {
124
125 this.startTimestamp = this.writeTimestamp = transactionId;
126 this.readTimestamp = readTimestamp;
127 this.epoch = epoch;
128 this.writeSet = writeSet;
129 this.conflictFreeWriteSet = conflictFreeWriteSet;
130 this.transactionManager = transactionManager;
131 this.visibilityLevel = visibilityLevel;
132 this.isLowLatency = isLowLatency;
133 }
134
135 /**
136 * Base constructor
137 *
138 * @param transactionId
139 * transaction identifier to assign
140 * @param epoch
141 * epoch of the TSOServer instance that created this transaction
142 * Used in High Availability to guarantee data consistency
143 * @param writeSet
144 * initial write set for the transaction.
145 * Should be empty in most cases.
146 * @param transactionManager
147 * transaction manager associated to this transaction.
148 * Usually, should be the one that created the transaction
149 * instance.
150 * @param readTimestamp
151 * the snapshot to read from
152 * @param writeTimestamp
153 * the timestamp to write to
154 *
155 */
156 public AbstractTransaction(long transactionId,
157 long epoch,
158 Set<T> writeSet,
159 Set<T> conflictFreeWriteSet,
160 AbstractTransactionManager transactionManager,
161 long readTimestamp,
162 long writeTimestamp,
163 boolean isLowLatency) {
164 this.startTimestamp = transactionId;
165 this.readTimestamp = readTimestamp;
166 this.writeTimestamp = writeTimestamp;
167 this.epoch = epoch;
168 this.writeSet = writeSet;
169 this.conflictFreeWriteSet = conflictFreeWriteSet;
170 this.transactionManager = transactionManager;
171 this.visibilityLevel = VisibilityLevel.SNAPSHOT;
172 this.isLowLatency = isLowLatency;
173 }
174
175 /**
176 * Creates a checkpoint and sets the visibility level to SNAPSHOT_EXCLUDE_CURRENT
177 * The number of checkpoints is bounded to NUM_CHECKPOINTS in order to make checkpoint a client side operation
178 * @throws TransactionException
179 */
180 public void checkpoint() throws TransactionException {
181
182 setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
183 this.readTimestamp = this.writeTimestamp++;
184
185 if (this.writeTimestamp % CommitTable.MAX_CHECKPOINTS_PER_TXN == 0) {
186 throw new TransactionException("Error: number of checkpoing cannot exceed " + (CommitTable.MAX_CHECKPOINTS_PER_TXN - 1));
187 }
188 }
189
190 /**
191 * Allows to define specific clean-up task for transaction implementations
192 */
193 public abstract void cleanup();
194
195 /**
196 * @see org.apache.omid.transaction.Transaction#getTransactionId()
197 */
198 @Override
199 public long getTransactionId() {
200 return startTimestamp;
201 }
202
203 /**
204 * @see org.apache.omid.transaction.Transaction#getEpoch()
205 */
206 @Override
207 public long getEpoch() {
208 return epoch;
209 }
210
211 /**
212 * @see org.apache.omid.transaction.Transaction#getStatus()
213 */
214 @Override
215 public Status getStatus() {
216 return status;
217 }
218
219 /**
220 * @see Transaction#isRollbackOnly()
221 */
222 @Override
223 public void setRollbackOnly() {
224 isRollbackOnly = true;
225 }
226
227 /**
228 * @see org.apache.omid.transaction.Transaction#isRollbackOnly()
229 */
230 @Override
231 public boolean isRollbackOnly() {
232 return isRollbackOnly;
233 }
234
235 /**
236 * Returns transaction manager associated to this transaction.
237 * @return transaction manager
238 */
239 public AbstractTransactionManager getTransactionManager() {
240 return transactionManager;
241 }
242
243 /**
244 * Returns the start timestamp for this transaction.
245 * @return start timestamp
246 */
247 public long getStartTimestamp() {
248 return startTimestamp;
249 }
250
251 /**
252 * Returns the read timestamp for this transaction.
253 * @return read timestamp
254 */
255 @Override
256 public long getReadTimestamp() {
257 return readTimestamp;
258 }
259
260 /**
261 * Returns the write timestamp for this transaction.
262 * @return write timestamp
263 */
264 @Override
265 public long getWriteTimestamp() {
266 return writeTimestamp;
267 }
268
269 /**
270 * Returns the commit timestamp for this transaction.
271 * @return commit timestamp
272 */
273 public long getCommitTimestamp() {
274 return commitTimestamp;
275 }
276
277 /**
278 * Returns the visibility level for this transaction.
279 * @return visibility level
280 */
281 public VisibilityLevel getVisibilityLevel() {
282 return visibilityLevel;
283 }
284
285 /**
286 * Sets the commit timestamp for this transaction.
287 * @param commitTimestamp
288 * the commit timestamp to set
289 */
290 public void setCommitTimestamp(long commitTimestamp) {
291 this.commitTimestamp = commitTimestamp;
292 }
293
294 /**
295 * Sets the visibility level for this transaction.
296 * @param visibilityLevel
297 * the {@link VisibilityLevel} to set
298 */
299 public void setVisibilityLevel(VisibilityLevel visibilityLevel) {
300 this.visibilityLevel = visibilityLevel;
301
302 // If we are setting visibility level to either SNAPSHOT or SNAPSHOT_ALL
303 // then we should let readTimestamp equals to writeTimestamp
304 if (this.visibilityLevel == VisibilityLevel.SNAPSHOT ||
305 this.visibilityLevel == VisibilityLevel.SNAPSHOT_ALL) {
306 this.readTimestamp = this.writeTimestamp;
307 }
308 }
309
310 /**
311 * Sets the status for this transaction.
312 * @param status
313 * the {@link Status} to set
314 */
315 public void setStatus(Status status) {
316 this.status = status;
317 }
318
319 /**
320 * Returns the current write-set for this transaction.
321 * @return write set
322 */
323 public Set<T> getWriteSet() {
324 return writeSet;
325 }
326
327 /**
328 * Returns the current write-set for this transaction that its elements are not candidates for conflict analysis.
329 * @return conflictFreeWriteSet
330 */
331 public Set<T> getConflictFreeWriteSet() {
332 return conflictFreeWriteSet;
333 }
334
335 /**
336 * Adds an element to the transaction write-set.
337 * @param element
338 * the element to add
339 */
340 public void addWriteSetElement(T element) {
341 writeSet.add(element);
342 }
343
344 /**
345 * Adds an element to the transaction conflict free write-set.
346 * @param element
347 * the element to add
348 */
349 public void addConflictFreeWriteSetElement(T element) {
350 conflictFreeWriteSet.add(element);
351 }
352
353 @Override
354 public String toString() {
355 return String.format("Tx-%s [%s] (ST=%d, RT=%d, WT=%d, CT=%d, Epoch=%d) WriteSet %s ConflictFreeWriteSet %s",
356 Long.toHexString(getTransactionId()),
357 status,
358 startTimestamp,
359 readTimestamp,
360 writeTimestamp,
361 commitTimestamp,
362 epoch,
363 writeSet,
364 conflictFreeWriteSet);
365 }
366
367 @Override
368 public Optional<Object> getMetadata(String key) {
369 return Optional.fromNullable(metadata.get(key));
370 }
371
372 /**
373 * Expects they metadata stored under key "key" to be of the "Set" type,
374 * append "value" to the existing set or creates a new one
375 */
376 @Override
377 @SuppressWarnings("unchecked")
378 public void appendMetadata(String key, Object value) {
379 List existingValue = (List) metadata.get(key);
380 if (existingValue == null) {
381 List<Object> newList = new ArrayList<>();
382 newList.add(value);
383 metadata.put(key, newList);
384 } else {
385 existingValue.add(value);
386 }
387 }
388
389 @Override
390 public void setMetadata(String key, Object value) {
391 metadata.put(key, value);
392 }
393
394 @Override
395 public boolean isLowLatency() {
396 return isLowLatency;
397 }
398 }