1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import com.google.protobuf.InvalidProtocolBufferException;
21
22 import org.apache.hadoop.hbase.client.Connection;
23 import org.apache.hadoop.hbase.client.Scan;
24
25 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
26 import org.apache.hadoop.hbase.filter.Filter;
27 import org.apache.hadoop.hbase.regionserver.RegionScanner;
28
29
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.omid.committable.CommitTable;
32 import org.apache.omid.committable.hbase.HBaseCommitTable;
33 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
34 import org.apache.omid.proto.TSOProto;
35 import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
36 import org.apache.omid.HBaseShims;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.CoprocessorEnvironment;
39 import org.apache.hadoop.hbase.client.Get;
40 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
41 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
42 import org.apache.hadoop.hbase.regionserver.RegionAccessWrapper;
43 import org.apache.hadoop.hbase.regionserver.RegionConnectionFactory;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import java.io.IOException;
48
49 import java.util.HashSet;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.Queue;
53 import java.util.concurrent.ConcurrentHashMap;
54 import java.util.concurrent.ConcurrentLinkedQueue;
55
56 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY;
57
58
59
60
61 public class OmidSnapshotFilter extends BaseRegionObserver {
62
63 private static final Logger LOG = LoggerFactory.getLogger(OmidSnapshotFilter.class);
64
65 private HBaseCommitTableConfig commitTableConf = null;
66 private RegionCoprocessorEnvironment env = null;
67 private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
68 private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap<>();
69 private CommitTable.Client inMemoryCommitTable = null;
70 private CommitTable.Client commitTableClient;
71 private Connection connection;
72
73 public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
74 LOG.info("Compactor coprocessor initialized");
75 this.inMemoryCommitTable = commitTableClient;
76 }
77
78 public OmidSnapshotFilter() {
79 LOG.info("Compactor coprocessor initialized via empty constructor");
80 }
81
82 @Override
83 public void start(CoprocessorEnvironment env) throws IOException {
84 LOG.info("Starting snapshot filter coprocessor");
85 this.env = (RegionCoprocessorEnvironment)env;
86 commitTableConf = new HBaseCommitTableConfig();
87 String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
88 if (commitTableName != null) {
89 commitTableConf.setTableName(commitTableName);
90 }
91 connection = RegionConnectionFactory
92 .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env);
93 commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
94 LOG.info("Snapshot filter started");
95 }
96
97 @Override
98 public void stop(CoprocessorEnvironment e) throws IOException {
99 LOG.info("stopping Snapshot filter");
100 LOG.info("Snapshot filter stopped");
101 }
102
103
104
105 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) {
106 SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(get);
107 if (snapshotFilter != null) {
108 snapshotFilterQueue.add(snapshotFilter);
109 }
110 }
111
112
113
114 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
115 throws IOException {
116
117 if (get.getAttribute(CellUtils.CLIENT_GET_ATTRIBUTE) == null) return;
118 boolean isLowLatency = Bytes.toBoolean(get.getAttribute(CellUtils.LL_ATTRIBUTE));
119 HBaseTransaction hbaseTransaction = getHBaseTransaction(get.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE),
120 isLowLatency);
121 SnapshotFilterImpl snapshotFilter = getSnapshotFilter(e);
122 snapshotFilterMap.put(get, snapshotFilter);
123
124 get.setMaxVersions();
125 Filter newFilter = TransactionFilters.getVisibilityFilter(get.getFilter(),
126 snapshotFilter, hbaseTransaction);
127 get.setFilter(newFilter);
128 }
129
130 private SnapshotFilterImpl getSnapshotFilter(ObserverContext<RegionCoprocessorEnvironment> e)
131 throws IOException {
132 SnapshotFilterImpl snapshotFilter= snapshotFilterQueue.poll();
133 if (snapshotFilter == null) {
134 RegionAccessWrapper regionAccessWrapper =
135 new RegionAccessWrapper(HBaseShims.getRegionCoprocessorRegion(e.getEnvironment()));
136 snapshotFilter = new SnapshotFilterImpl(regionAccessWrapper, initAndGetCommitTableClient());
137 }
138 return snapshotFilter;
139 }
140
141
142
143 public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
144 Scan scan,
145 RegionScanner s) throws IOException {
146 preScannerOpen(e,scan);
147 return s;
148 }
149
150
151
152 public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
153 Scan scan) throws IOException {
154 byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE);
155
156 if (byteTransaction == null) {
157 return;
158 }
159 boolean isLowLatency = Bytes.toBoolean(scan.getAttribute(CellUtils.LL_ATTRIBUTE));
160 HBaseTransaction hbaseTransaction = getHBaseTransaction(byteTransaction, isLowLatency);
161 SnapshotFilterImpl snapshotFilter = getSnapshotFilter(e);
162
163 scan.setMaxVersions();
164 Filter newFilter = TransactionFilters.getVisibilityFilter(scan.getFilter(),
165 snapshotFilter, hbaseTransaction);
166 scan.setFilter(newFilter);
167 return;
168 }
169
170 private HBaseTransaction getHBaseTransaction(byte[] byteTransaction, boolean isLowLatency)
171 throws InvalidProtocolBufferException {
172 TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction);
173 long id = transaction.getTimestamp();
174 long readTs = transaction.getReadTimestamp();
175 long epoch = transaction.getEpoch();
176 VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel());
177
178 return new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), null,
179 isLowLatency);
180
181 }
182
183 private CommitTable.Client initAndGetCommitTableClient() throws IOException {
184 if (inMemoryCommitTable != null) {
185 return inMemoryCommitTable;
186 }
187 return commitTableClient;
188 }
189
190 }