1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import java.io.IOException;
21 import java.util.HashSet;
22 import java.util.Map;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Executors;
25
26 import org.apache.hadoop.hbase.client.Connection;
27 import org.apache.hadoop.hbase.client.ConnectionFactory;
28 import org.apache.hadoop.hbase.client.Get;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.omid.committable.CommitTable;
32 import org.apache.omid.committable.hbase.HBaseCommitTable;
33 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
34 import org.apache.omid.tools.hbase.HBaseLogin;
35 import org.apache.omid.tso.client.CellId;
36 import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
37 import org.apache.omid.tso.client.TSOClient;
38 import org.apache.omid.tso.client.TSOProtocol;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
43 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
44 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors;
45 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
46
47 public class HBaseTransactionManager extends AbstractTransactionManager implements HBaseTransactionClient {
48
49 private static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionManager.class);
50 private final Connection connection;
51
52 private static class HBaseTransactionFactory implements TransactionFactory<HBaseCellId> {
53
54 @Override
55 public HBaseTransaction createTransaction(long transactionId, long epoch, AbstractTransactionManager tm) {
56
57 return new HBaseTransaction(transactionId, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(),
58 tm, tm.isLowLatency());
59
60 }
61
62 }
63
64
65
66
67
68 public static TransactionManager newInstance() throws IOException, InterruptedException {
69 return newInstance(new HBaseOmidClientConfiguration());
70 }
71
72 public static TransactionManager newInstance(HBaseOmidClientConfiguration configuration)
73 throws IOException, InterruptedException {
74
75 HBaseLogin.loginIfNeeded(configuration);
76 return builder(configuration).build();
77 }
78
79 public static class Builder {
80
81
82 private final HBaseOmidClientConfiguration hbaseOmidClientConf;
83
84
85 private Optional<TSOProtocol> tsoClient = Optional.absent();
86 private Optional<CommitTable.Client> commitTableClient = Optional.absent();
87 private Optional<CommitTable.Writer> commitTableWriter = Optional.absent();
88 private Optional<PostCommitActions> postCommitter = Optional.absent();
89
90 public Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
91 this.hbaseOmidClientConf = hbaseOmidClientConf;
92 }
93
94 public Builder tsoClient(TSOProtocol tsoClient) {
95 this.tsoClient = Optional.of(tsoClient);
96 return this;
97 }
98
99 public Builder commitTableClient(CommitTable.Client client) {
100 this.commitTableClient = Optional.of(client);
101 return this;
102 }
103
104 public Builder commitTableWriter(CommitTable.Writer writer) {
105 this.commitTableWriter = Optional.of(writer);
106 return this;
107 }
108
109 Builder postCommitter(PostCommitActions postCommitter) {
110 this.postCommitter = Optional.of(postCommitter);
111 return this;
112 }
113
114 public HBaseTransactionManager build() throws IOException, InterruptedException {
115
116 Connection connection = ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration());
117
118 CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient(connection)).get();
119 CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter(connection)).get();
120 PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient, connection)).get();
121 TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();
122
123 return new HBaseTransactionManager(hbaseOmidClientConf,
124 postCommitter,
125 tsoClient,
126 commitTableClient,
127 commitTableWriter,
128 new HBaseTransactionFactory(),
129 connection);
130 }
131
132 private Optional<TSOProtocol> buildTSOClient() throws IOException, InterruptedException {
133 return Optional.of((TSOProtocol) TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration()));
134 }
135
136
137 private Optional<CommitTable.Client> buildCommitTableClient(Connection connection) throws IOException {
138 HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
139 commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
140 CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
141 return Optional.of(commitTable.getClient());
142 }
143
144 private Optional<CommitTable.Writer> buildCommitTableWriter(Connection connection) throws IOException {
145 HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
146 commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
147 CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
148 return Optional.of(commitTable.getWriter());
149 }
150
151 private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient, Connection connection) {
152
153 PostCommitActions postCommitter;
154 PostCommitActions syncPostCommitter = new HBaseSyncPostCommitter(hbaseOmidClientConf.getMetrics(),
155 commitTableClient, connection);
156 switch(hbaseOmidClientConf.getPostCommitMode()) {
157 case ASYNC:
158 ListeningExecutorService postCommitExecutor =
159 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
160 new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
161 postCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
162 break;
163 case SYNC:
164 default:
165 postCommitter = syncPostCommitter;
166 break;
167 }
168
169 return Optional.of(postCommitter);
170 }
171
172 }
173
174 public static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
175 return new Builder(hbaseOmidClientConf);
176 }
177
178 private HBaseTransactionManager(HBaseOmidClientConfiguration hBaseOmidClientConfiguration,
179 PostCommitActions postCommitter,
180 TSOProtocol tsoClient,
181 CommitTable.Client commitTableClient,
182 CommitTable.Writer commitTableWriter,
183 HBaseTransactionFactory hBaseTransactionFactory, Connection connection) {
184
185 super(hBaseOmidClientConfiguration.getMetrics(),
186 postCommitter,
187 tsoClient,
188 commitTableClient,
189 commitTableWriter,
190 hBaseTransactionFactory);
191 this.connection = connection;
192 }
193
194
195
196
197 @Override
198 public void closeResources() throws IOException {
199 connection.close();
200 }
201
202 @Override
203 public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
204 try {
205
206 HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
207 hBaseTx.flushTables();
208 } catch (IOException e) {
209 throw new TransactionManagerException("Exception while flushing writes", e);
210 }
211 }
212
213 @Override
214 public void preRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
215 try {
216
217 HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
218 hBaseTx.flushTables();
219 } catch (IOException e) {
220 throw new TransactionManagerException("Exception while flushing writes", e);
221 }
222 }
223
224 @Override
225 public long getHashForTable(byte[] tableName) {
226 return HBaseCellId.getHasher().putBytes(tableName).hash().asLong();
227 }
228
229 @Override
230 public long getLowWatermark() throws TransactionException {
231 try {
232 return commitTableClient.readLowWatermark().get();
233 } catch (ExecutionException ee) {
234 throw new TransactionException("Error reading low watermark", ee.getCause());
235 } catch (InterruptedException ie) {
236 Thread.currentThread().interrupt();
237 throw new TransactionException("Interrupted reading low watermark", ie);
238 }
239 }
240
241
242
243
244
245 static HBaseTransaction enforceHBaseTransactionAsParam(AbstractTransaction<? extends CellId> tx) {
246
247 if (tx instanceof HBaseTransaction) {
248 return (HBaseTransaction) tx;
249 } else {
250 throw new IllegalArgumentException(
251 "The transaction object passed is not an instance of HBaseTransaction");
252 }
253
254 }
255
256 public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
257 tsoClient.setConflictDetectionLevel(conflictDetectionLevel);
258 }
259
260 public ConflictDetectionLevel getConflictDetectionLevel() {
261 return tsoClient.getConflictDetectionLevel();
262 }
263
264 static class CommitTimestampLocatorImpl implements CommitTimestampLocator {
265
266 private HBaseCellId hBaseCellId;
267 private final Map<Long, Long> commitCache;
268 private TableAccessWrapper tableAccessWrapper;
269
270 CommitTimestampLocatorImpl(HBaseCellId hBaseCellId, Map<Long, Long> commitCache, TableAccessWrapper tableAccessWrapper) {
271 this.hBaseCellId = hBaseCellId;
272 this.commitCache = commitCache;
273 this.tableAccessWrapper = tableAccessWrapper;
274 }
275
276 CommitTimestampLocatorImpl(HBaseCellId hBaseCellId, Map<Long, Long> commitCache) {
277 this.hBaseCellId = hBaseCellId;
278 this.commitCache = commitCache;
279 this.tableAccessWrapper = null;
280 this.tableAccessWrapper = new HTableAccessWrapper(hBaseCellId.getTable().getHTable(), hBaseCellId.getTable().getHTable());
281 }
282
283 @Override
284 public Optional<Long> readCommitTimestampFromCache(long startTimestamp) {
285 if (commitCache.containsKey(startTimestamp)) {
286 return Optional.of(commitCache.get(startTimestamp));
287 }
288 return Optional.absent();
289 }
290
291 @Override
292 public Optional<Long> readCommitTimestampFromShadowCell(long startTimestamp) throws IOException {
293
294 Get get = new Get(hBaseCellId.getRow());
295 byte[] family = hBaseCellId.getFamily();
296 byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(hBaseCellId.getQualifier());
297 get.addColumn(family, shadowCellQualifier);
298 get.setMaxVersions(1);
299 get.setTimeStamp(startTimestamp);
300 Result result = tableAccessWrapper.get(get);
301 if (result.containsColumn(family, shadowCellQualifier)) {
302 return Optional.of(Bytes.toLong(result.getValue(family, shadowCellQualifier)));
303 }
304 return Optional.absent();
305 }
306
307 }
308
309 }