View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
22  import org.apache.phoenix.thirdparty.com.google.common.collect.Iterators;
23  import org.apache.phoenix.thirdparty.com.google.common.collect.PeekingIterator;
24  import org.apache.commons.collections4.map.LRUMap;
25  import org.apache.omid.HBaseShims;
26  import org.apache.omid.committable.CommitTable;
27  import org.apache.omid.committable.CommitTable.Client;
28  import org.apache.omid.committable.CommitTable.CommitTimestamp;
29  import org.apache.omid.transaction.CellUtils;
30  import org.apache.omid.transaction.CellInfo;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.client.Get;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
37  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import java.io.IOException;
43  import java.util.ArrayList;
44  import java.util.Collections;
45  import java.util.HashMap;
46  import java.util.List;
47  import java.util.Map;
48  
49  import java.util.SortedMap;
50  import java.util.concurrent.ExecutionException;
51  
52  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
53  
54  public class CompactorScanner implements InternalScanner {
55      private static final Logger LOG = LoggerFactory.getLogger(CompactorScanner.class);
56      private final InternalScanner internalScanner;
57      private final CommitTable.Client commitTableClient;
58  
59      private final boolean isMajorCompaction;
60      private final boolean retainNonTransactionallyDeletedCells;
61      private final long lowWatermark;
62  
63      private final Region hRegion;
64  
65      private boolean hasMoreRows = false;
66      private List<Cell> currentRowWorthValues = new ArrayList<Cell>();
67      private final LRUMap<Long ,Optional<CommitTimestamp>> commitCache;
68  
69      public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
70                              InternalScanner internalScanner,
71                              Client commitTableClient,
72                              boolean isMajorCompaction,
73                              boolean preserveNonTransactionallyDeletedCells) throws IOException {
74          this.internalScanner = internalScanner;
75          this.commitTableClient = commitTableClient;
76          this.isMajorCompaction = isMajorCompaction;
77          this.retainNonTransactionallyDeletedCells = preserveNonTransactionallyDeletedCells;
78          this.lowWatermark = getLowWatermarkFromCommitTable();
79          // Obtain the table in which the scanner is going to operate
80          this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
81          commitCache = new LRUMap<>(1000);
82          LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
83                  lowWatermark, hRegion.getRegionInfo());
84      }
85  
86      @Override
87      public boolean next(List<Cell> results) throws IOException {
88          return next(results, -1);
89      }
90  
91      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
92          int limit = scannerContext.getBatchLimit();
93          return next(result, limit);
94      }
95  
96      public boolean next(List<Cell> result, int limit) throws IOException {
97  
98          if (currentRowWorthValues.isEmpty()) {
99              // 1) Read next row
100             List<Cell> scanResult = new ArrayList<Cell>();
101             hasMoreRows = internalScanner.next(scanResult);
102             if (LOG.isTraceEnabled()) {
103                 LOG.trace("Row: Result {} limit {} more rows? {}", scanResult, limit, hasMoreRows);
104             }
105             // 2) Traverse result list separating normal cells from shadow
106             // cells and building a map to access easily the shadow cells.
107             SortedMap<Cell, Optional<Cell>> cellToSc = CellUtils.mapCellsToShadowCells(scanResult);
108 
109             // 3) traverse the list of row key values isolated before and
110             // check which ones should be discarded
111             Map<String, CellInfo> lastTimestampedCellsInRow = new HashMap<>();
112             PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter
113                     = Iterators.peekingIterator(cellToSc.entrySet().iterator());
114             while (iter.hasNext()) {
115                 Map.Entry<Cell, Optional<Cell>> entry = iter.next();
116                 Cell cell = entry.getKey();
117                 Optional<Cell> shadowCellOp = entry.getValue();
118 
119                 if (cell.getTimestamp() > lowWatermark) {
120                     retain(currentRowWorthValues, cell, shadowCellOp);
121                     continue;
122                 }
123 
124                 if (shouldRetainNonTransactionallyDeletedCell(cell)) {
125                     retain(currentRowWorthValues, cell, shadowCellOp);
126                     continue;
127                 }
128 
129                 // During a minor compaction the coprocessor may only see a
130                 // subset of store files and may not have the all the versions
131                 // of a cell available for consideration. Therefore, if it
132                 // deletes a cell with a tombstone during a minor compaction,
133                 // an older version of the cell may become visible again. So,
134                 // we have to remove tombstones only in major compactions.
135                 if (isMajorCompaction) {
136                     // Strong assumption that family delete cells arrive first before any other column
137                     if (CellUtils.isTombstone(cell)) {
138                         if (shadowCellOp.isPresent()) {
139                             skipToNextColumn(cell, iter);
140                         } else {
141                             Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
142                             // Clean the cell only if it is valid
143                             if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
144                                 skipToNextColumn(cell, iter);
145                             }
146                         }
147                         continue;
148                     }
149                 }
150 
151                 if (shadowCellOp.isPresent()) {
152                     saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCellOp.get());
153                 } else {
154                     Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
155                     if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
156                         // Build the missing shadow cell...
157                         byte[] shadowCellValue = Bytes.toBytes(commitTimestamp.get().getValue());
158                         Cell shadowCell = CellUtils.buildShadowCellFromCell(cell, shadowCellValue);
159                         saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCell);
160                     } else {
161                         LOG.trace("Discarding cell {}", cell);
162                     }
163                 }
164             }
165             retainLastTimestampedCellsSaved(currentRowWorthValues, lastTimestampedCellsInRow);
166 
167             // 4) Sort the list
168             Collections.sort(currentRowWorthValues, KeyValue.COMPARATOR);
169         }
170 
171         // Chomp current row worth values up to the limit
172         if (currentRowWorthValues.size() <= limit || limit == -1) {
173             result.addAll(currentRowWorthValues);
174             currentRowWorthValues.clear();
175         } else {
176             result.addAll(currentRowWorthValues.subList(0, limit));
177             currentRowWorthValues.subList(0, limit).clear();
178         }
179         LOG.trace("Results to preserve {}", result);
180 
181         return hasMoreRows;
182     }
183 
184     @Override
185     public void close() throws IOException {
186         internalScanner.close();
187     }
188 
189     // ----------------------------------------------------------------------------------------------------------------
190     // Helper methods
191     // ----------------------------------------------------------------------------------------------------------------
192 
193     @VisibleForTesting
194     public boolean shouldRetainNonTransactionallyDeletedCell(Cell cell) {
195         return (CellUtil.isDelete(cell) || CellUtil.isDeleteFamily(cell))
196                 &&
197                 retainNonTransactionallyDeletedCells;
198     }
199 
200     private void saveLastTimestampedCell(Map<String, CellInfo> lastCells, Cell cell, Cell shadowCell) {
201         String cellKey = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
202                 + ":"
203                 + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
204         LOG.trace("Cell Key: {}", cellKey);
205 
206         if (!lastCells.containsKey(cellKey)) {
207             lastCells.put(cellKey, new CellInfo(cell, shadowCell));
208         } else {
209             if (lastCells.get(cellKey).getTimestamp() < cell.getTimestamp()) {
210                 lastCells.put(cellKey, new CellInfo(cell, shadowCell));
211             } else {
212                 LOG.trace("Forgetting old cell {}", cell);
213             }
214         }
215     }
216 
217     private long getLowWatermarkFromCommitTable() throws IOException {
218         try {
219             LOG.trace("About to read log watermark from commit table");
220             return commitTableClient.readLowWatermark().get();
221         } catch (InterruptedException ie) {
222             Thread.currentThread().interrupt();
223             LOG.warn("Interrupted getting low watermark from commit table", ie);
224             throw new IOException("Interrupted getting low watermark from commit table");
225         } catch (ExecutionException ee) {
226             LOG.warn("Problem getting low watermark from commit table");
227             throw new IOException("Problem getting low watermark from commit table", ee.getCause());
228         }
229     }
230 
231 
232     private Result getShadowCell(byte[] row, byte[] family, byte[] qualifier, long timestamp) throws IOException {
233         Get g = new Get(row);
234         g.addColumn(family, qualifier);
235         g.setTimeStamp(timestamp);
236         Result r = hRegion.get(g);
237         return r;
238     }
239 
240 
241     private Optional<CommitTimestamp> getCommitTimestampWithRaces(Cell cell) throws IOException {
242         try {
243             byte[] family = CellUtil.cloneFamily(cell);
244             byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
245                     cell.getQualifierOffset(),
246                     cell.getQualifierLength());
247             // 2) Then check the commit table
248             Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
249             if (ct.isPresent()) {
250                 if (ct.get().isValid()) {
251                     return Optional.of(ct.get());
252                 }
253                 // If invalid still should check sc because maybe we got falsely invalidated by another compaction or ll client
254             }
255 
256             // 3) Read from shadow cell
257             Result r = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
258             if (r.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
259                 Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
260                         Bytes.toLong(r.getValue(family, qualifier)), true));
261                 return retval;
262             }
263 
264             // [OMID-146] - we have to invalidate a transaction if it hasn't reached the commit table
265             // 4) invalidate the entry
266             Boolean invalidated = commitTableClient.tryInvalidateTransaction(cell.getTimestamp()).get();
267             if (invalidated) {
268                 // If we are running lowLatency Omid, we could have managed to invalidate a ct entry,
269                 // but the committing client already wrote to shadow cells:
270                 Result r2 = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
271                 if (r2.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
272                     Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
273                             Bytes.toLong(r2.getValue(family, qualifier)), true));
274                     commitTableClient.deleteCommitEntry(cell.getTimestamp());
275                     return retval;
276                 }
277                 return Optional.absent();
278             }
279 
280             // 5) We did not manage to invalidate the transactions then check the commit table
281             Optional<CommitTimestamp> ct2 = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
282             if (ct2.isPresent()) {
283                 return Optional.of(ct2.get());
284             }
285 
286             // 6) Read from shadow cell
287             Result r2 = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
288             if (r2.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
289                 Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
290                         Bytes.toLong(r2.getValue(family, qualifier)), true));
291                 return retval;
292             }
293 
294         } catch (InterruptedException e) {
295             Thread.currentThread().interrupt();
296             throw new IOException("Interrupted while getting commit timestamp from commit table");
297         } catch (ExecutionException e) {
298             throw new IOException("Error getting commit timestamp from commit table", e);
299         }
300 
301         return Optional.absent();
302     }
303 
304     private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
305 
306         // 1) First check the cache
307         Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
308         if (cachedValue != null) {
309             return cachedValue;
310         }
311         Optional<CommitTimestamp> value = getCommitTimestampWithRaces(cell);
312         commitCache.put(cell.getTimestamp(), value);
313         return value;
314     }
315 
316     private void retain(List<Cell> result, Cell cell, Optional<Cell> shadowCell) {
317         LOG.trace("Retaining cell {}", cell);
318         result.add(cell);
319         if (shadowCell.isPresent()) {
320             LOG.trace("...with shadow cell {}", cell, shadowCell.get());
321             result.add(shadowCell.get());
322         } else {
323             LOG.trace("...without shadow cell! (TS is above Low Watermark)");
324         }
325     }
326 
327     private void retainLastTimestampedCellsSaved(List<Cell> result, Map<String, CellInfo> lastTimestampedCellsInRow) {
328         for (CellInfo cellInfo : lastTimestampedCellsInRow.values()) {
329             LOG.trace("Retaining last cell {} with shadow cell {}", cellInfo.getCell(), cellInfo.getShadowCell());
330             result.add(cellInfo.getCell());
331             result.add(cellInfo.getShadowCell());
332         }
333     }
334 
335     private void skipToNextColumn(Cell cell, PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter) {
336         boolean isFamilyDelete = CellUtils.isFamilyDeleteCell(cell);
337         while (iter.hasNext()
338                 && CellUtil.matchingFamily(iter.peek().getKey(), cell)
339                 && (CellUtil.matchingQualifier(iter.peek().getKey(), cell) || isFamilyDelete)) {
340             iter.next();
341         }
342     }
343 
344 }