View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21  
22  
23  import org.apache.hadoop.hbase.client.Connection;
24  import org.apache.omid.committable.CommitTable;
25  import org.apache.omid.committable.hbase.HBaseCommitTable;
26  import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
27  import org.apache.hadoop.hbase.CoprocessorEnvironment;
28  import org.apache.hadoop.hbase.DoNotRetryIOException;
29  import org.apache.omid.HBaseShims;
30  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
31  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
32  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
33  import org.apache.hadoop.hbase.regionserver.CompactorScanner;
34  import org.apache.hadoop.hbase.regionserver.InternalScanner;
35  import org.apache.hadoop.hbase.regionserver.RegionConnectionFactory;
36  import org.apache.hadoop.hbase.regionserver.ScanType;
37  import org.apache.hadoop.hbase.regionserver.Store;
38  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
39  
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import java.io.IOException;
44  import java.util.Queue;
45  import java.util.concurrent.ConcurrentLinkedQueue;
46  
47  import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY;
48  
49  /**
50   * Garbage collector for stale data: triggered upon HBase
51   * compactions, it removes data from uncommitted transactions
52   * older than the low watermark using a special scanner
53   */
54  public class OmidCompactor extends BaseRegionObserver {
55  
56      private static final Logger LOG = LoggerFactory.getLogger(OmidCompactor.class);
57  
58      private static final String HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY
59              = "omid.hbase.compactor.retain.tombstones";
60      private static final boolean HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT = true;
61  
62      final static String OMID_COMPACTABLE_CF_FLAG = "OMID_ENABLED";
63  
64      private boolean enableCompactorForAllFamilies = false;
65  
66      private HBaseCommitTableConfig commitTableConf = null;
67      private RegionCoprocessorEnvironment env = null;
68  
69      @VisibleForTesting
70      CommitTable.Client commitTableClient;
71  
72      // When compacting, if a cell which has been marked by HBase as Delete or
73      // Delete Family (that is, non-transactionally deleted), we allow the user
74      // to decide what the compactor scanner should do with it: retain it or not
75      // If retained, the deleted cell will appear after a minor compaction, but
76      // will be deleted anyways after a major one
77      private boolean retainNonTransactionallyDeletedCells;
78  
79      private Connection connection;
80  
81      public OmidCompactor() {
82          this(false);
83      }
84  
85      public OmidCompactor(boolean enableCompactorForAllFamilies) {
86          LOG.info("Compactor coprocessor initialized");
87          this.enableCompactorForAllFamilies = enableCompactorForAllFamilies;
88      }
89  
90      @Override
91      public void start(CoprocessorEnvironment env) throws IOException {
92          LOG.info("Starting compactor coprocessor");
93          commitTableConf = new HBaseCommitTableConfig();
94          String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
95          if (commitTableName != null) {
96              commitTableConf.setTableName(commitTableName);
97          }
98  
99          connection = RegionConnectionFactory
100                 .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, (RegionCoprocessorEnvironment) env);
101         commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
102         retainNonTransactionallyDeletedCells =
103                 env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
104                         HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
105         LOG.info("Compactor coprocessor started");
106     }
107 
108     @Override
109     public void stop(CoprocessorEnvironment e) throws IOException {
110         LOG.info("Stopping compactor coprocessor");
111         LOG.info("Compactor coprocessor stopped");
112     }
113 
114 
115 
116     @Override
117     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> env,
118                                       Store store,
119                                       InternalScanner scanner,
120                                       ScanType scanType,
121                                       CompactionRequest request) throws IOException {
122         boolean omidCompactable;
123         try {
124             if (enableCompactorForAllFamilies) {
125                 omidCompactable = true;
126             } else {
127 
128                 omidCompactable = HBaseShims.OmidCompactionEnabled(env, store, OMID_COMPACTABLE_CF_FLAG);
129             }
130 
131             // only column families tagged as compactable are compacted
132             // with omid compactor
133             if (!omidCompactable) {
134                 return scanner;
135             } else {
136                 boolean isMajorCompaction = request.isMajor();
137                 return new CompactorScanner(env,
138                         scanner,
139                         commitTableClient,
140                         isMajorCompaction,
141                         retainNonTransactionallyDeletedCells);
142             }
143         } catch (IOException e) {
144             throw e;
145         } catch (Exception e) {
146             throw new DoNotRetryIOException(e);
147         }
148     }
149 }