View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.base.Function;
21  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
22  import org.apache.phoenix.thirdparty.com.google.common.hash.Hashing;
23  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.Futures;
24  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.MoreExecutors;
25  import org.apache.omid.committable.CommitTable;
26  import org.apache.omid.committable.CommitTable.CommitTimestamp;
27  import org.apache.omid.metrics.Counter;
28  import org.apache.omid.metrics.MetricsRegistry;
29  import org.apache.omid.metrics.Timer;
30  import org.apache.omid.transaction.Transaction.Status;
31  import org.apache.omid.tso.client.AbortException;
32  import org.apache.omid.tso.client.CellId;
33  import org.apache.omid.tso.client.ConnectionException;
34  import org.apache.omid.tso.client.ServiceUnavailableException;
35  import org.apache.omid.tso.client.TSOProtocol;
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import java.io.IOException;
40  import java.util.concurrent.ExecutionException;
41  
42  
43  import static org.apache.omid.metrics.MetricsUtils.name;
44  
45  /**
46   * Omid's base abstract implementation of the {@link TransactionManager} interface.
47   *
48   * Provides extra methods to allow transaction manager developers to perform
49   * different actions before/after the methods exposed by the {@link TransactionManager} interface.
50   *
51   * So, this abstract class must be extended by particular implementations of
52   * transaction managers related to different storage systems (HBase...)
53   */
54  public abstract class AbstractTransactionManager implements TransactionManager {
55  
56      private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionManager.class);
57  
58      public interface TransactionFactory<T extends CellId> {
59  
60          AbstractTransaction<T> createTransaction(long transactionId, long epoch, AbstractTransactionManager tm);
61  
62      }
63  
64      private final PostCommitActions postCommitter;
65      protected final TSOProtocol tsoClient;
66      protected final CommitTable.Client commitTableClient;
67      private final CommitTable.Writer commitTableWriter;
68      private final TransactionFactory<? extends CellId> transactionFactory;
69  
70      // Metrics
71      private final Timer startTimestampTimer;
72      private final Timer commitTimer;
73      private final Timer fenceTimer;
74      private final Counter committedTxsCounter;
75      private final Counter rolledbackTxsCounter;
76      private final Counter errorTxsCounter;
77      private final Counter invalidatedTxsCounter;
78  
79      /**
80       * Base constructor
81       *
82       * @param metrics
83       *            instrumentation metrics
84       * @param postCommitter
85       *            post commit action executor
86       * @param tsoClient
87       *            a client for accessing functionality of the status oracle
88       * @param commitTableClient
89       *            a client for accessing functionality of the commit table
90       * @param transactionFactory
91       *            a transaction factory to create the specific transaction
92       *            objects required by the transaction manager being implemented.
93       */
94      public AbstractTransactionManager(MetricsRegistry metrics,
95                                        PostCommitActions postCommitter,
96                                        TSOProtocol tsoClient,
97                                        CommitTable.Client commitTableClient,
98                                        CommitTable.Writer commitTableWriter,
99                                        TransactionFactory<? extends CellId> transactionFactory) {
100 
101         this.tsoClient = tsoClient;
102         this.postCommitter = postCommitter;
103         this.commitTableClient = commitTableClient;
104         this.commitTableWriter = commitTableWriter;
105         this.transactionFactory = transactionFactory;
106 
107         // Metrics configuration
108         this.startTimestampTimer = metrics.timer(name("omid", "tm", "hbase", "startTimestamp", "latency"));
109         this.commitTimer = metrics.timer(name("omid", "tm", "hbase", "commit", "latency"));
110         this.fenceTimer = metrics.timer(name("omid", "tm", "hbase", "fence", "latency"));
111         this.committedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "committedTxs"));
112         this.rolledbackTxsCounter = metrics.counter(name("omid", "tm", "hbase", "rolledbackTxs"));
113         this.errorTxsCounter = metrics.counter(name("omid", "tm", "hbase", "erroredTxs"));
114         this.invalidatedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "invalidatedTxs"));
115 
116     }
117 
118     /**
119      * Allows transaction manager developers to perform actions before creating a transaction.
120      * @throws TransactionManagerException in case of any issues
121      */
122     public void preBegin() throws TransactionManagerException {}
123 
124     /**
125      * @see org.apache.omid.transaction.TransactionManager#begin()
126      */
127     @Override
128     public final Transaction begin() throws TransactionException {
129 
130         try {
131             preBegin();
132 
133             long startTimestamp, epoch;
134 
135             // The loop is required for HA scenarios where we get the timestamp
136             // but when getting the epoch, the client is connected to a new TSOServer
137             // When this happen, the epoch will be larger than the startTimestamp,
138             // so we need to start the transaction again. We use the fact that epoch
139             // is always smaller or equal to a timestamp, and therefore, we first need
140             // to get the timestamp and then the epoch.
141             startTimestampTimer.start();
142             try {
143                 do {
144                     startTimestamp = tsoClient.getNewStartTimestamp().get();
145                     epoch = tsoClient.getEpoch();
146                 } while (epoch > startTimestamp);
147             } finally {
148                 startTimestampTimer.stop();
149             }
150 
151             AbstractTransaction<? extends CellId> tx = transactionFactory.createTransaction(startTimestamp, epoch, this);
152 
153             postBegin(tx);
154 
155             return tx;
156         } catch (TransactionManagerException e) {
157             throw new TransactionException("An error has occured during PreBegin/PostBegin", e);
158         } catch (ExecutionException e) {
159             throw new TransactionException("Could not get new timestamp", e);
160         } catch (InterruptedException ie) {
161             Thread.currentThread().interrupt();
162             throw new TransactionException("Interrupted getting timestamp", ie);
163         }
164     }
165 
166     /**
167      * Generates hash ID for table name, this hash is later-on sent to the TSO and used for fencing
168      * @param tableName - the table name
169      * @return
170      */
171     abstract public long getHashForTable(byte[] tableName);
172 
173     /**
174      * Return the commit table client
175      * @return commitTableClient
176      */
177     public CommitTable.Client getCommitTableClient() {
178         return commitTableClient;
179     }
180 
181     /**
182      * @see org.apache.omid.transaction.TransactionManager#fence(byte[])
183      */
184     @Override
185     public final Transaction fence(byte[] tableName) throws TransactionException {
186         long fenceTimestamp;
187         long tableID = getHashForTable(tableName); Hashing.murmur3_128().newHasher().putBytes(tableName).hash().asLong();
188 
189         try {
190             fenceTimer.start();
191             try {
192                 fenceTimestamp = tsoClient.getFence(tableID).get();
193             } finally {
194                 fenceTimer.stop();
195             }
196 
197             AbstractTransaction<? extends CellId> tx = transactionFactory.createTransaction(fenceTimestamp, fenceTimestamp, this);
198 
199             return tx;
200         } catch (ExecutionException e) {
201             throw new TransactionException("Could not get fence", e);
202         } catch (InterruptedException ie) {
203             Thread.currentThread().interrupt();
204             throw new TransactionException("Interrupted creating a fence", ie);
205         }
206     }
207 
208     /**
209      * Allows transaction manager developers to perform actions after having started a transaction.
210      * @param transaction
211      *            the transaction that was just created.
212      * @throws TransactionManagerException  in case of any issues
213      */
214     public void postBegin(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
215 
216     /**
217      * Allows transaction manager developers to perform actions before committing a transaction.
218      * @param transaction
219      *            the transaction that is going to be committed.
220      * @throws TransactionManagerException  in case of any issues
221      */
222     public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
223 
224     /**
225      * @see org.apache.omid.transaction.TransactionManager#commit(Transaction)
226      */
227     @Override
228     public final void commit(Transaction transaction) throws RollbackException, TransactionException {
229 
230         AbstractTransaction<? extends CellId> tx = enforceAbstractTransactionAsParam(transaction);
231         enforceTransactionIsInRunningState(tx);
232 
233         if (tx.isRollbackOnly()) { // Manage explicit user rollback
234             rollback(tx);
235             throw new RollbackException(tx + ": Tx was set to rollback explicitly");
236         }
237 
238         try {
239 
240             preCommit(tx);
241 
242             commitTimer.start();
243             try {
244                 if (tx.getWriteSet().isEmpty() && tx.getConflictFreeWriteSet().isEmpty()) {
245                     markReadOnlyTransaction(tx); // No need for read-only transactions to contact the TSO Server
246                 } else {
247                     if (tsoClient.isLowLatency())
248                         commitLowLatencyTransaction(tx);
249                     else
250                         commitRegularTransaction(tx);
251                 }
252                 committedTxsCounter.inc();
253             } finally {
254                 commitTimer.stop();
255             }
256 
257             postCommit(tx);
258 
259         } catch (TransactionManagerException e) {
260             throw new TransactionException(e.getMessage(), e);
261         }
262 
263     }
264 
265     /**
266      * Allows transaction manager developers to perform actions after committing a transaction.
267      * @param transaction
268      *            the transaction that was committed.
269      * @throws TransactionManagerException in case of any issues
270      */
271     public void postCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
272 
273     /**
274      * Allows transaction manager developers to perform actions before rolling-back a transaction.
275      * @param transaction the transaction that is going to be rolled-back.
276      * @throws TransactionManagerException in case of any issues
277      */
278     public void preRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
279 
280     /**
281      * @see org.apache.omid.transaction.TransactionManager#rollback(Transaction)
282      */
283     @Override
284     public final void rollback(Transaction transaction) throws TransactionException {
285 
286         AbstractTransaction<? extends CellId> tx = enforceAbstractTransactionAsParam(transaction);
287         enforceTransactionIsInRunningState(tx);
288 
289         try {
290 
291             preRollback(tx);
292 
293             // Make sure its commit timestamp is 0, so the cleanup does the right job
294             tx.setCommitTimestamp(0);
295             tx.setStatus(Status.ROLLEDBACK);
296 
297             postRollback(tx);
298 
299         } catch (TransactionManagerException e) {
300             throw new TransactionException(e.getMessage(), e);
301         } finally {
302             tx.cleanup();
303         }
304 
305     }
306 
307     /**
308      * Allows transaction manager developers to perform actions after rolling-back a transaction.
309      * @param transaction
310      *            the transaction that was rolled-back.
311      * @throws TransactionManagerException in case of any issues
312      */
313     public void postRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
314 
315 
316     protected abstract void closeResources() throws IOException;
317 
318     /**
319      * @see java.io.Closeable#close()
320      */
321     @Override
322     public final void close() throws IOException {
323         tsoClient.close();
324         closeResources();
325     }
326 
327     // ----------------------------------------------------------------------------------------------------------------
328     // Helper methods
329     // ----------------------------------------------------------------------------------------------------------------
330 
331     private void enforceTransactionIsInRunningState(Transaction transaction) {
332 
333         if (transaction.getStatus() != Status.RUNNING) {
334             throw new IllegalArgumentException("Transaction was already " + transaction.getStatus());
335         }
336 
337     }
338 
339     @SuppressWarnings("unchecked")
340     // NOTE: We are sure that tx is not parametrized
341     private AbstractTransaction<? extends CellId> enforceAbstractTransactionAsParam(Transaction tx) {
342 
343         if (tx instanceof AbstractTransaction) {
344             return (AbstractTransaction<? extends CellId>) tx;
345         } else {
346             throw new IllegalArgumentException(
347                     "The transaction object passed is not an instance of AbstractTransaction");
348         }
349 
350     }
351 
352     private void markReadOnlyTransaction(AbstractTransaction<? extends CellId> readOnlyTx) {
353 
354         readOnlyTx.setStatus(Status.COMMITTED_RO);
355 
356     }
357 
358     private void commitLowLatencyTransaction(AbstractTransaction<? extends CellId> tx)
359             throws RollbackException, TransactionException {
360         try {
361 
362             long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet(), tx.getConflictFreeWriteSet()).get();
363             boolean committed = commitTableWriter.atomicAddCommittedTransaction(tx.getStartTimestamp(),commitTs);
364             if (!committed) {
365                 // Transaction has been invalidated by other client
366                 rollback(tx);
367                 commitTableClient.deleteCommitEntry(tx.getStartTimestamp());
368                 rolledbackTxsCounter.inc();
369                 throw new RollbackException("Transaction " + tx.getTransactionId() + " got invalidated");
370             }
371             certifyCommitForTx(tx, commitTs);
372             updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
373 
374         } catch (ExecutionException e) {
375             if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future
376                 rollback(tx);
377                 rolledbackTxsCounter.inc();
378                 throw new RollbackException(tx.getStartTimestamp() + ": Conflicts detected in writeset", e.getCause());
379             }
380 
381             if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
382                 errorTxsCounter.inc();
383                 rollback(tx); // Rollback proactively cause it's likely that a new TSOServer is now master
384                 throw new RollbackException(tx.getStartTimestamp() + " rolled-back precautionary", e.getCause());
385             } else {
386                 throw new TransactionException(tx.getStartTimestamp() + ": cannot determine Tx outcome", e.getCause());
387             }
388         } catch (InterruptedException e) {
389             e.printStackTrace();
390         } catch (IOException e) {
391             e.printStackTrace();
392         }
393     }
394 
395     private void commitRegularTransaction(AbstractTransaction<? extends CellId> tx)
396             throws RollbackException, TransactionException
397     {
398 
399         try {
400 
401             long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet(), tx.getConflictFreeWriteSet()).get();
402             certifyCommitForTx(tx, commitTs);
403             updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
404 
405         } catch (ExecutionException e) {
406 
407             if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future
408                 rollback(tx);
409                 rolledbackTxsCounter.inc();
410                 throw new RollbackException(tx.getStartTimestamp() + ": Conflicts detected in writeset", e.getCause());
411             }
412 
413             if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
414 
415                 errorTxsCounter.inc();
416                 try {
417                     LOG.warn("Can't contact the TSO for receiving outcome for Tx {}. Checking Commit Table...", tx.getStartTimestamp());
418                     // Check the commit table to find if the target TSO woke up in the meantime and added the commit
419                     // TODO: Decide what we should we do if we can not contact the commit table
420                     Optional<CommitTimestamp> commitTimestamp =
421                             commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
422                     if (commitTimestamp.isPresent()) {
423                         if (commitTimestamp.get().isValid()) {
424                             LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx.getStartTimestamp());
425                             certifyCommitForTx(tx, commitTimestamp.get().getValue());
426                             postCommitter.updateShadowCells(tx); // But do NOT remove transaction from commit table
427                         } else { // Probably another Tx in a new TSO Server invalidated this transaction
428                             LOG.warn("{}: Invalidated commit TS found in Commit Table. Rolling-back...", tx.getStartTimestamp());
429                             rollback(tx);
430                             throw new RollbackException(tx.getStartTimestamp() + " invalidated by other Tx started", e.getCause());
431                         }
432                     } else {
433                         LOG.warn("{}: Trying to invalidate Tx proactively in Commit Table...", tx.getStartTimestamp());
434                         boolean invalidated = commitTableClient.tryInvalidateTransaction(tx.getStartTimestamp()).get();
435                         if (invalidated) {
436                             LOG.warn("{}: Invalidated proactively in Commit Table. Rolling-back Tx...", tx.getStartTimestamp());
437                             invalidatedTxsCounter.inc();
438                             rollback(tx); // Rollback proactively cause it's likely that a new TSOServer is now master
439                             throw new RollbackException(tx.getStartTimestamp() + " rolled-back precautionary", e.getCause());
440                         } else {
441                             LOG.warn("{}: Invalidation could NOT be completed. Re-checking Commit Table...", tx.getStartTimestamp());
442                             // TODO: Decide what we should we do if we can not contact the commit table
443                             commitTimestamp = commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
444                             if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
445                                 LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx.getStartTimestamp());
446                                 certifyCommitForTx(tx, commitTimestamp.get().getValue());
447                                 postCommitter.updateShadowCells(tx); // But do NOT remove transaction from commit table
448                             } else {
449                                 LOG.error("{}: Can't determine Transaction outcome", tx.getStartTimestamp());
450                                 throw new TransactionException(tx.getStartTimestamp() + ": cannot determine Tx outcome");
451                             }
452                         }
453                     }
454                 } catch (ExecutionException e1) {
455                     throw new TransactionException(tx.getStartTimestamp() + ": problem reading commitTS from Commit Table", e1);
456                 } catch (InterruptedException e1) {
457                     Thread.currentThread().interrupt();
458                     throw new TransactionException(tx.getStartTimestamp() + ": interrupted while reading commitTS from Commit Table", e1);
459                 }
460             } else {
461                 throw new TransactionException(tx.getStartTimestamp() + ": cannot determine Tx outcome", e.getCause());
462             }
463         } catch (InterruptedException ie) {
464             Thread.currentThread().interrupt();
465             throw new TransactionException(tx.getStartTimestamp() + ": interrupted during commit", ie);
466 
467         }
468 
469     }
470 
471     private void updateShadowCellsAndRemoveCommitTableEntry(final AbstractTransaction<? extends CellId> tx,
472                                                             final PostCommitActions postCommitter) {
473 
474         Futures.transform(postCommitter.updateShadowCells(tx), new Function<Void, Void>() {
475             @Override
476             public Void apply(Void aVoid) {
477                 postCommitter.removeCommitTableEntry(tx);
478                 return null;
479             }
480         }, MoreExecutors.directExecutor());
481 
482     }
483 
484     private void certifyCommitForTx(AbstractTransaction<? extends CellId> txToSetup, long commitTS) {
485 
486         txToSetup.setStatus(Status.COMMITTED);
487         txToSetup.setCommitTimestamp(commitTS);
488 
489     }
490 
491     public boolean isLowLatency() {
492         return tsoClient.isLowLatency();
493     }
494 }