1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.committable;
19
20 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
22 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
23 import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
24
25 import java.io.IOException;
26 import java.util.concurrent.ConcurrentHashMap;
27
28 public class InMemoryCommitTable implements CommitTable {
29
30 final ConcurrentHashMap<Long, Long> table = new ConcurrentHashMap<>();
31
32 long lowWatermark;
33
34 @Override
35 public CommitTable.Writer getWriter() {
36 return new Writer();
37 }
38
39 @Override
40 public CommitTable.Client getClient() {
41 return new Client();
42 }
43
44 public class Writer implements CommitTable.Writer {
45 @Override
46 public void addCommittedTransaction(long startTimestamp, long commitTimestamp) {
47
48
49
50 table.putIfAbsent(startTimestamp, commitTimestamp);
51 }
52
53 @Override
54 public void updateLowWatermark(long lowWatermark) throws IOException {
55 InMemoryCommitTable.this.lowWatermark = lowWatermark;
56 }
57
58 @Override
59 public void flush() throws IOException {
60
61 }
62
63 @Override
64 public void clearWriteBuffer() {
65 table.clear();
66 }
67
68 @Override
69 public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
70 startTimestamp = removeCheckpointBits(startTimestamp);
71
72
73
74 return (table.putIfAbsent(startTimestamp, commitTimestamp) == null);
75 }
76 }
77
78 public class Client implements CommitTable.Client {
79 @Override
80 public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
81 startTimestamp = removeCheckpointBits(startTimestamp);
82 SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
83 Long result = table.get(startTimestamp);
84 if (result == null) {
85 f.set(Optional.<CommitTimestamp>absent());
86 } else {
87 if (result == INVALID_TRANSACTION_MARKER) {
88 f.set(Optional.of(new CommitTimestamp(Location.COMMIT_TABLE, INVALID_TRANSACTION_MARKER, false)));
89 } else {
90 f.set(Optional.of(new CommitTimestamp(Location.COMMIT_TABLE, result, true)));
91 }
92 }
93 return f;
94 }
95
96 @Override
97 public ListenableFuture<Long> readLowWatermark() {
98 SettableFuture<Long> f = SettableFuture.create();
99 f.set(lowWatermark);
100 return f;
101 }
102
103 @Override
104 public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
105 startTimestamp = removeCheckpointBits(startTimestamp);
106 SettableFuture<Void> f = SettableFuture.create();
107 table.remove(startTimestamp);
108 f.set(null);
109 return f;
110 }
111
112 @Override
113 public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
114 startTimestamp = removeCheckpointBits(startTimestamp);
115 SettableFuture<Boolean> f = SettableFuture.create();
116 Long old = table.get(startTimestamp);
117
118
119 if (old == null) {
120
121 old = table.putIfAbsent(startTimestamp, INVALID_TRANSACTION_MARKER);
122
123 if (old == null || old == INVALID_TRANSACTION_MARKER) {
124 f.set(true);
125 return f;
126 }
127 } else {
128
129 if (old == INVALID_TRANSACTION_MARKER) {
130 f.set(true);
131 return f;
132 }
133 }
134
135
136
137 f.set(false);
138 return f;
139 }
140 }
141
142 public int countElements() {
143 return table.size();
144 }
145
146 static long removeCheckpointBits(long startTimestamp) {
147 return startTimestamp - (startTimestamp % CommitTable.MAX_CHECKPOINTS_PER_TXN);
148 }
149 }