View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.apache.omid.metrics.MetricsUtils.name;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.ExecutionException;
29  
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.client.Connection;
32  import org.apache.hadoop.hbase.client.Mutation;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.client.Table;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.omid.committable.CommitTable;
37  import org.apache.omid.metrics.MetricsRegistry;
38  import org.apache.omid.metrics.Timer;
39  import org.apache.omid.tso.client.CellId;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ListenableFuture;
44  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
45  
46  public class HBaseSyncPostCommitter implements PostCommitActions {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(HBaseSyncPostCommitter.class);
49  
50      private final MetricsRegistry metrics;
51      private final CommitTable.Client commitTableClient;
52  
53      private final Timer commitTableUpdateTimer;
54      private final Timer shadowCellsUpdateTimer;
55      static final int MAX_BATCH_SIZE=1000;
56      private final Connection connection;
57  
58      public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient,
59                                    Connection connection) {
60          this.metrics = metrics;
61          this.commitTableClient = commitTableClient;
62  
63          this.commitTableUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "commitTableUpdate", "latency"));
64          this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency"));
65          this.connection = connection;
66      }
67  
68      private void flushMutations(TableName tableName, List<Mutation> mutations) throws IOException, InterruptedException {
69          try (Table table = connection.getTable(tableName)){
70              table.batch(mutations, new Object[mutations.size()]);
71          }
72  
73      }
74  
75      private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
76                                 Map<TableName,List<Mutation>> mutations) throws IOException, InterruptedException {
77          Put put = new Put(cell.getRow());
78          put.addColumn(cell.getFamily(),
79                  CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
80                  cell.getTimestamp(),
81                  Bytes.toBytes(tx.getCommitTimestamp()));
82  
83          TableName table = cell.getTable().getHTable().getName();
84          List<Mutation> tableMutations = mutations.get(table);
85          if (tableMutations == null) {
86              ArrayList<Mutation> newList = new ArrayList<>();
87              newList.add(put);
88              mutations.put(table, newList);
89          } else {
90              tableMutations.add(put);
91              if (tableMutations.size() > MAX_BATCH_SIZE) {
92                  flushMutations(table, tableMutations);
93                  mutations.remove(table);
94              }
95          }
96      }
97  
98      @Override
99      public ListenableFuture<Void> updateShadowCells(AbstractTransaction<? extends CellId> transaction) {
100 
101         SettableFuture<Void> updateSCFuture = SettableFuture.create();
102 
103         HBaseTransaction tx = HBaseTransactionManager.enforceHBaseTransactionAsParam(transaction);
104 
105         shadowCellsUpdateTimer.start();
106         try {
107             Map<TableName,List<Mutation>> mutations = new HashMap<>();
108             // Add shadow cells
109             for (HBaseCellId cell : tx.getWriteSet()) {
110                 addShadowCell(cell, tx, updateSCFuture, mutations);
111             }
112 
113             for (HBaseCellId cell : tx.getConflictFreeWriteSet()) {
114                 addShadowCell(cell, tx, updateSCFuture, mutations);
115             }
116 
117             for (Map.Entry<TableName,List<Mutation>> entry: mutations.entrySet()) {
118                 flushMutations(entry.getKey(), entry.getValue());
119             }
120 
121             //Only if all is well we set to null and delete commit entry from commit table
122             updateSCFuture.set(null);
123         } catch (IOException | InterruptedException e) {
124             LOG.warn("{}: Error inserting shadow cells", tx, e);
125             updateSCFuture.setException(
126                     new TransactionManagerException(tx + ": Error inserting shadow cells ", e));
127         } finally {
128             shadowCellsUpdateTimer.stop();
129         }
130 
131         return updateSCFuture;
132 
133     }
134 
135     @Override
136     public ListenableFuture<Void> removeCommitTableEntry(AbstractTransaction<? extends CellId> transaction) {
137 
138         SettableFuture<Void> updateSCFuture = SettableFuture.create();
139 
140         HBaseTransaction tx = HBaseTransactionManager.enforceHBaseTransactionAsParam(transaction);
141 
142         commitTableUpdateTimer.start();
143 
144         try {
145             commitTableClient.deleteCommitEntry(tx.getStartTimestamp()).get();
146             updateSCFuture.set(null);
147         } catch (InterruptedException e) {
148             Thread.currentThread().interrupt();
149             LOG.warn("{}: interrupted during commit table entry delete", tx, e);
150             updateSCFuture.setException(
151                     new TransactionManagerException(tx + ": interrupted during commit table entry delete"));
152         } catch (ExecutionException e) {
153             LOG.warn("{}: can't remove commit table entry", tx, e);
154             updateSCFuture.setException(new TransactionManagerException(tx + ": can't remove commit table entry"));
155         } finally {
156             commitTableUpdateTimer.stop();
157         }
158 
159         return updateSCFuture;
160 
161     }
162 
163 }