1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21
22
23 import org.apache.hadoop.hbase.client.Connection;
24 import org.apache.omid.committable.CommitTable;
25 import org.apache.omid.committable.hbase.HBaseCommitTable;
26 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
27 import org.apache.hadoop.hbase.CoprocessorEnvironment;
28 import org.apache.hadoop.hbase.DoNotRetryIOException;
29 import org.apache.omid.HBaseShims;
30 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
31 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
32 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
33 import org.apache.hadoop.hbase.regionserver.CompactorScanner;
34 import org.apache.hadoop.hbase.regionserver.InternalScanner;
35 import org.apache.hadoop.hbase.regionserver.RegionConnectionFactory;
36 import org.apache.hadoop.hbase.regionserver.ScanType;
37 import org.apache.hadoop.hbase.regionserver.Store;
38 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
39
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import java.io.IOException;
44 import java.util.Queue;
45 import java.util.concurrent.ConcurrentLinkedQueue;
46
47 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY;
48
49
50
51
52
53
54 public class OmidCompactor extends BaseRegionObserver {
55
56 private static final Logger LOG = LoggerFactory.getLogger(OmidCompactor.class);
57
58 private static final String HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY
59 = "omid.hbase.compactor.retain.tombstones";
60 private static final boolean HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT = true;
61
62 final static String OMID_COMPACTABLE_CF_FLAG = "OMID_ENABLED";
63
64 private boolean enableCompactorForAllFamilies = false;
65
66 private HBaseCommitTableConfig commitTableConf = null;
67 private RegionCoprocessorEnvironment env = null;
68
69 @VisibleForTesting
70 CommitTable.Client commitTableClient;
71
72
73
74
75
76
77 private boolean retainNonTransactionallyDeletedCells;
78
79 private Connection connection;
80
81 public OmidCompactor() {
82 this(false);
83 }
84
85 public OmidCompactor(boolean enableCompactorForAllFamilies) {
86 LOG.info("Compactor coprocessor initialized");
87 this.enableCompactorForAllFamilies = enableCompactorForAllFamilies;
88 }
89
90 @Override
91 public void start(CoprocessorEnvironment env) throws IOException {
92 LOG.info("Starting compactor coprocessor");
93 commitTableConf = new HBaseCommitTableConfig();
94 String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
95 if (commitTableName != null) {
96 commitTableConf.setTableName(commitTableName);
97 }
98
99 connection = RegionConnectionFactory
100 .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, (RegionCoprocessorEnvironment) env);
101 commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
102 retainNonTransactionallyDeletedCells =
103 env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
104 HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
105 LOG.info("Compactor coprocessor started");
106 }
107
108 @Override
109 public void stop(CoprocessorEnvironment e) throws IOException {
110 LOG.info("Stopping compactor coprocessor");
111 LOG.info("Compactor coprocessor stopped");
112 }
113
114
115
116 @Override
117 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> env,
118 Store store,
119 InternalScanner scanner,
120 ScanType scanType,
121 CompactionRequest request) throws IOException {
122 boolean omidCompactable;
123 try {
124 if (enableCompactorForAllFamilies) {
125 omidCompactable = true;
126 } else {
127
128 omidCompactable = HBaseShims.OmidCompactionEnabled(env, store, OMID_COMPACTABLE_CF_FLAG);
129 }
130
131
132
133 if (!omidCompactable) {
134 return scanner;
135 } else {
136 boolean isMajorCompaction = request.isMajor();
137 return new CompactorScanner(env,
138 scanner,
139 commitTableClient,
140 isMajorCompaction,
141 retainNonTransactionallyDeletedCells);
142 }
143 } catch (IOException e) {
144 throw e;
145 } catch (Exception e) {
146 throw new DoNotRetryIOException(e);
147 }
148 }
149 }