1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.CACHE;
21 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
22 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.NOT_PRESENT;
23 import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.ExecutionException;
34
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.client.Get;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.ResultScanner;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.omid.committable.CommitTable;
46 import org.apache.omid.committable.CommitTable.CommitTimestamp;
47 import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
48 import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import org.apache.phoenix.thirdparty.com.google.common.base.Function;
53 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
54 import org.apache.phoenix.thirdparty.com.google.common.base.Predicate;
55 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
56 import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
57 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
58 import org.apache.phoenix.thirdparty.com.google.common.collect.Multimaps;
59
60 public class SnapshotFilterImpl implements SnapshotFilter {
61
62 private static Logger LOG = LoggerFactory.getLogger(SnapshotFilterImpl.class);
63
64 private TableAccessWrapper tableAccessWrapper;
65
66 private CommitTable.Client commitTableClient;
67
68 public TableAccessWrapper getTableAccessWrapper() {
69 return tableAccessWrapper;
70 }
71
72 public SnapshotFilterImpl(TableAccessWrapper tableAccessWrapper, CommitTable.Client commitTableClient) throws IOException {
73 this.tableAccessWrapper = tableAccessWrapper;
74 this.commitTableClient = commitTableClient;
75 }
76
77 public SnapshotFilterImpl(TableAccessWrapper tableAccessWrapper) throws IOException {
78 this(tableAccessWrapper, null);
79 }
80
81 public SnapshotFilterImpl(CommitTable.Client commitTableClient) throws IOException {
82 this(null, commitTableClient);
83 }
84
85 void setTableAccessWrapper(TableAccessWrapper tableAccessWrapper) {
86 this.tableAccessWrapper = tableAccessWrapper;
87 }
88
89 void setCommitTableClient(CommitTable.Client commitTableClient) {
90 this.commitTableClient = commitTableClient;
91 }
92
93 private String getRowFamilyString(Cell cell) {
94 return Bytes.toString((CellUtil.cloneRow(cell))) + ":" + Bytes.toString(CellUtil.cloneFamily(cell));
95 }
96
97
98
99
100
101
102
103
104
105
106 private boolean checkFamilyDeletionCache(Cell cell, HBaseTransaction transaction, Map<String, Long> familyDeletionCache, Map<Long, Long> commitCache) throws IOException {
107 String key = getRowFamilyString(cell);
108 Long familyDeletionCommitTimestamp = familyDeletionCache.get(key);
109 if (familyDeletionCommitTimestamp != null && familyDeletionCommitTimestamp >= cell.getTimestamp()) {
110 return true;
111 }
112 return false;
113 }
114
115 private void healShadowCell(Cell cell, long commitTimestamp) {
116 Put put = new Put(CellUtil.cloneRow(cell));
117 byte[] family = CellUtil.cloneFamily(cell);
118 byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
119 cell.getQualifierOffset(),
120 cell.getQualifierLength());
121 put.addColumn(family, shadowCellQualifier, cell.getTimestamp(), Bytes.toBytes(commitTimestamp));
122 try {
123 tableAccessWrapper.put(put);
124 } catch (IOException e) {
125 LOG.warn("Failed healing shadow cell for kv {}", cell, e);
126 }
127 }
128
129
130
131
132
133
134
135
136
137 public Optional<CommitTimestamp> readCommitTimestampFromShadowCell(long cellStartTimestamp, CommitTimestampLocator locator)
138 throws IOException
139 {
140
141 Optional<CommitTimestamp> commitTS = Optional.absent();
142
143 Optional<Long> commitTimestamp = locator.readCommitTimestampFromShadowCell(cellStartTimestamp);
144 if (commitTimestamp.isPresent()) {
145 commitTS = Optional.of(new CommitTimestamp(SHADOW_CELL, commitTimestamp.get(), true));
146 }
147
148 return commitTS;
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long epoch,
167 CommitTimestampLocator locator, boolean isLowLatency) throws IOException {
168
169 try {
170
171 Optional<Long> commitTimestamp = locator.readCommitTimestampFromCache(cellStartTimestamp);
172 if (commitTimestamp.isPresent()) {
173 return new CommitTimestamp(CACHE, commitTimestamp.get(), true);
174 }
175
176
177
178 boolean invalidatedByOther = false;
179 Optional<CommitTimestamp> commitTimestampFromCT = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
180 if (commitTimestampFromCT.isPresent()) {
181 if (isLowLatency && !commitTimestampFromCT.get().isValid())
182 invalidatedByOther = true;
183 else
184 return commitTimestampFromCT.get();
185 }
186
187
188 Optional<CommitTimestamp> commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
189 if (commitTimeStamp.isPresent()) {
190 return commitTimeStamp.get();
191 }
192
193
194 if (invalidatedByOther) {
195 assert(!commitTimestampFromCT.get().isValid());
196 return commitTimestampFromCT.get();
197 }
198
199
200
201 if (cellStartTimestamp < epoch || isLowLatency) {
202 boolean invalidated = commitTableClient.tryInvalidateTransaction(cellStartTimestamp).get();
203 if (invalidated) {
204
205
206
207 if (isLowLatency) {
208 commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
209 if (commitTimeStamp.isPresent()) {
210
211 commitTableClient.deleteCommitEntry(cellStartTimestamp);
212 return commitTimeStamp.get();
213 }
214 }
215
216 return new CommitTimestamp(COMMIT_TABLE, CommitTable.INVALID_TRANSACTION_MARKER, false);
217 }
218 }
219
220
221 commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
222 if (commitTimeStamp.isPresent()) {
223 return commitTimeStamp.get();
224 }
225
226
227 commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
228 if (commitTimeStamp.isPresent()) {
229 return commitTimeStamp.get();
230 }
231
232
233 return new CommitTimestamp(NOT_PRESENT, -1L , true);
234 } catch (InterruptedException e) {
235 Thread.currentThread().interrupt();
236 throw new IOException("Interrupted while finding commit timestamp", e);
237 } catch (ExecutionException e) {
238 throw new IOException("Problem finding commit timestamp", e);
239 }
240
241 }
242
243 public Optional<Long> tryToLocateCellCommitTimestamp(long epoch,
244 Cell cell,
245 Map<Long, Long> commitCache,
246 boolean isLowLatency)
247 throws IOException {
248
249 CommitTimestamp tentativeCommitTimestamp =
250 locateCellCommitTimestamp(
251 cell.getTimestamp(),
252 epoch,
253 new CommitTimestampLocatorImpl(
254 new HBaseCellId(null,
255 CellUtil.cloneRow(cell),
256 CellUtil.cloneFamily(cell),
257 CellUtil.cloneQualifier(cell),
258 cell.getTimestamp()),
259 commitCache,
260 tableAccessWrapper),
261 isLowLatency);
262
263
264 if (!tentativeCommitTimestamp.isValid()) {
265 return Optional.absent();
266 }
267
268 switch (tentativeCommitTimestamp.getLocation()) {
269 case COMMIT_TABLE:
270
271
272
273
274
275 healShadowCell(cell, tentativeCommitTimestamp.getValue());
276 return Optional.of(tentativeCommitTimestamp.getValue());
277 case CACHE:
278 case SHADOW_CELL:
279 return Optional.of(tentativeCommitTimestamp.getValue());
280 case NOT_PRESENT:
281 return Optional.absent();
282 default:
283 assert (false);
284 return Optional.absent();
285 }
286 }
287
288
289 private Optional<Long> getCommitTimestamp(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache)
290 throws IOException {
291
292 long startTimestamp = transaction.getStartTimestamp();
293
294 if (kv.getTimestamp() == startTimestamp) {
295 return Optional.of(startTimestamp);
296 }
297
298 if (commitTableClient == null) {
299 assert (transaction.getTransactionManager() != null);
300 commitTableClient = transaction.getTransactionManager().getCommitTableClient();
301 }
302
303 return tryToLocateCellCommitTimestamp(transaction.getEpoch(), kv,
304 commitCache, transaction.isLowLatency());
305 }
306
307 private Map<Long, Long> buildCommitCache(List<Cell> rawCells) {
308
309 Map<Long, Long> commitCache = new HashMap<>();
310
311 for (Cell cell : rawCells) {
312 if (CellUtils.isShadowCell(cell)) {
313 commitCache.put(cell.getTimestamp(), Bytes.toLong(CellUtil.cloneValue(cell)));
314 }
315 }
316
317 return commitCache;
318 }
319
320
321 private void buildFamilyDeletionCache(HBaseTransaction transaction, List<Cell> rawCells, Map<String, Long> familyDeletionCache, Map<Long, Long> commitCache, Map<String,byte[]> attributeMap) throws IOException {
322 for (Cell cell : rawCells) {
323 if (CellUtils.isFamilyDeleteCell(cell)) {
324 String key = getRowFamilyString(cell);
325
326 if (familyDeletionCache.containsKey(key))
327 return;
328
329 Optional<Long> commitTimeStamp = getTSIfInTransaction(cell, transaction);
330
331 if (!commitTimeStamp.isPresent()) {
332 commitTimeStamp = getTSIfInSnapshot(cell, transaction, commitCache);
333 }
334
335 if (commitTimeStamp.isPresent()) {
336 familyDeletionCache.put(key, commitTimeStamp.get());
337 } else {
338 Cell lastCell = cell;
339 Map<Long, Long> cmtCache;
340 boolean foundCommittedFamilyDeletion = false;
341 while (!foundCommittedFamilyDeletion) {
342
343 Get g = createPendingGet(lastCell, 3);
344
345 Result result = tableAccessWrapper.get(g);
346 List<Cell> resultCells = result.listCells();
347 if (resultCells == null) {
348 break;
349 }
350
351 cmtCache = buildCommitCache(resultCells);
352 for (Cell c : resultCells) {
353 if (CellUtils.isFamilyDeleteCell(c)) {
354 commitTimeStamp = getTSIfInSnapshot(c, transaction, cmtCache);
355 if (commitTimeStamp.isPresent()) {
356 familyDeletionCache.put(key, commitTimeStamp.get());
357 foundCommittedFamilyDeletion = true;
358 break;
359 }
360 lastCell = c;
361 }
362 }
363 }
364 }
365 }
366 }
367 }
368
369
370 public Optional<Long> getTSIfInTransaction(Cell kv, HBaseTransaction transaction) {
371 long startTimestamp = transaction.getStartTimestamp();
372 long readTimestamp = transaction.getReadTimestamp();
373
374
375
376
377 if (kv.getTimestamp() >= startTimestamp && kv.getTimestamp() <= readTimestamp) {
378 return Optional.of(kv.getTimestamp());
379 }
380
381 return Optional.absent();
382 }
383
384
385 public Optional<Long> getTSIfInSnapshot(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache)
386 throws IOException {
387
388 Optional<Long> commitTimestamp = getCommitTimestamp(kv, transaction, commitCache);
389
390 if (commitTimestamp.isPresent() && commitTimestamp.get() < transaction.getStartTimestamp())
391 return commitTimestamp;
392
393 return Optional.absent();
394 }
395
396 private Get createPendingGet(Cell cell, int versionCount) throws IOException {
397
398 Get pendingGet = new Get(CellUtil.cloneRow(cell));
399 pendingGet.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
400 pendingGet.addColumn(CellUtil.cloneFamily(cell), CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
401 cell.getQualifierOffset(),
402 cell.getQualifierLength()));
403 pendingGet.setMaxVersions(versionCount);
404 pendingGet.setTimeRange(0, cell.getTimestamp());
405
406 return pendingGet;
407 }
408
409
410
411
412
413
414
415
416
417
418
419
420 public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
421 int versionsToRequest, Map<String, Long> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException {
422
423 assert (rawCells != null && transaction != null && versionsToRequest >= 1);
424
425 List<Cell> keyValuesInSnapshot = new ArrayList<>();
426 List<Get> pendingGetsList = new ArrayList<>();
427
428 int numberOfVersionsToFetch = versionsToRequest * 2;
429 if (numberOfVersionsToFetch < 1) {
430 numberOfVersionsToFetch = versionsToRequest;
431 }
432
433 Map<Long, Long> commitCache = buildCommitCache(rawCells);
434 buildFamilyDeletionCache(transaction, rawCells, familyDeletionCache, commitCache, attributeMap);
435
436 ImmutableList<Collection<Cell>> filteredCells;
437 if (transaction.getVisibilityLevel() == VisibilityLevel.SNAPSHOT_ALL) {
438 filteredCells = groupCellsByColumnFilteringShadowCells(rawCells);
439 } else {
440 filteredCells = groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(rawCells);
441 }
442
443 for (Collection<Cell> columnCells : filteredCells) {
444 boolean snapshotValueFound = false;
445 Cell oldestCell = null;
446 for (Cell cell : columnCells) {
447 oldestCell = cell;
448 if (getTSIfInTransaction(cell, transaction).isPresent() ||
449 getTSIfInSnapshot(cell, transaction, commitCache).isPresent()) {
450
451 if (transaction.getVisibilityLevel() == VisibilityLevel.SNAPSHOT_ALL) {
452 keyValuesInSnapshot.add(cell);
453 if (getTSIfInTransaction(cell, transaction).isPresent()) {
454 snapshotValueFound = false;
455 continue;
456 } else {
457 snapshotValueFound = true;
458 break;
459 }
460 } else {
461 if (!checkFamilyDeletionCache(cell, transaction, familyDeletionCache, commitCache) &&
462 !CellUtils.isTombstone(cell)) {
463 keyValuesInSnapshot.add(cell);
464 }
465 snapshotValueFound = true;
466 break;
467
468 }
469 }
470 }
471 if (!snapshotValueFound) {
472 assert (oldestCell != null);
473 Get pendingGet = createPendingGet(oldestCell, numberOfVersionsToFetch);
474 for (Map.Entry<String,byte[]> entry : attributeMap.entrySet()) {
475 pendingGet.setAttribute(entry.getKey(), entry.getValue());
476 }
477 pendingGetsList.add(pendingGet);
478 }
479 }
480
481 if (!pendingGetsList.isEmpty()) {
482 Result[] pendingGetsResults = tableAccessWrapper.get(pendingGetsList);
483 for (Result pendingGetResult : pendingGetsResults) {
484 if (!pendingGetResult.isEmpty()) {
485 keyValuesInSnapshot.addAll(
486 filterCellsForSnapshot(pendingGetResult.listCells(), transaction, numberOfVersionsToFetch, familyDeletionCache, attributeMap));
487 }
488 }
489 }
490
491 Collections.sort(keyValuesInSnapshot, KeyValue.COMPARATOR);
492
493 return keyValuesInSnapshot;
494 }
495
496 @Override
497 public Result get(Get get, HBaseTransaction transaction) throws IOException {
498 Result result = tableAccessWrapper.get(get);
499
500 List<Cell> filteredKeyValues = Collections.emptyList();
501 if (!result.isEmpty()) {
502 filteredKeyValues = filterCellsForSnapshot(result.listCells(), transaction, get.getMaxVersions(), new HashMap<String, Long>(), get.getAttributesMap());
503 }
504
505 return Result.create(filteredKeyValues);
506 }
507
508 @Override
509 public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException {
510
511 return new TransactionalClientScanner(transaction, scan, 1);
512
513 }
514
515 public boolean isCommitted(HBaseCellId hBaseCellId, long epoch, boolean isLowLatency) throws TransactionException {
516 try {
517 long timestamp = hBaseCellId.getTimestamp();
518 CommitTimestamp tentativeCommitTimestamp =
519 locateCellCommitTimestamp(timestamp,
520 epoch,
521 new CommitTimestampLocatorImpl(hBaseCellId,
522 Maps.<Long, Long>newHashMap(),
523 tableAccessWrapper),
524 isLowLatency);
525
526
527 if (!tentativeCommitTimestamp.isValid()) {
528 return false;
529 }
530
531 switch (tentativeCommitTimestamp.getLocation()) {
532 case COMMIT_TABLE:
533 case SHADOW_CELL:
534 return true;
535 case NOT_PRESENT:
536 return false;
537 case CACHE:
538 default:
539 return false;
540 }
541 } catch (IOException e) {
542 throw new TransactionException("Failure while checking if a transaction was committed", e);
543 }
544 }
545
546 static ImmutableList<Collection<Cell>> groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(List<Cell> rawCells) {
547
548 Predicate<Cell> shadowCellAndFamilyDeletionFilter = new Predicate<Cell>() {
549
550 @Override
551 public boolean apply(Cell cell) {
552 boolean familyDeletionMarkerCondition = CellUtils.isFamilyDeleteCell(cell);
553
554 return cell != null && !CellUtils.isShadowCell(cell) && !familyDeletionMarkerCondition;
555 }
556
557 };
558
559 Function<Cell, ColumnWrapper> cellToColumnWrapper = new Function<Cell, ColumnWrapper>() {
560
561 @Override
562 public ColumnWrapper apply(Cell cell) {
563 return new ColumnWrapper(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
564 }
565
566 };
567
568 return Multimaps.index(Iterables.filter(rawCells, shadowCellAndFamilyDeletionFilter), cellToColumnWrapper)
569 .asMap().values()
570 .asList();
571 }
572
573
574 static ImmutableList<Collection<Cell>> groupCellsByColumnFilteringShadowCells(List<Cell> rawCells) {
575
576 Predicate<Cell> shadowCellFilter = new Predicate<Cell>() {
577 @Override
578 public boolean apply(Cell cell) {
579 return cell != null && !CellUtils.isShadowCell(cell);
580 }
581 };
582
583 Function<Cell, ColumnWrapper> cellToColumnWrapper = new Function<Cell, ColumnWrapper>() {
584
585 @Override
586 public ColumnWrapper apply(Cell cell) {
587 return new ColumnWrapper(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
588 }
589
590 };
591
592 return Multimaps.index(Iterables.filter(rawCells, shadowCellFilter), cellToColumnWrapper)
593 .asMap().values()
594 .asList();
595 }
596
597 @Override
598 public void close() throws Exception {
599 tableAccessWrapper.close();
600 }
601
602
603 public class TransactionalClientScanner implements ResultScanner {
604
605 private HBaseTransaction state;
606 private ResultScanner innerScanner;
607 private int maxVersions;
608 Map<String, Long> familyDeletionCache;
609 private Map<String,byte[]> attributeMap;
610
611 TransactionalClientScanner(HBaseTransaction state, Scan scan, int maxVersions)
612 throws IOException {
613 if (scan.hasFilter()) {
614 LOG.warn("Client scanner with filter will return un expected results. Use Coprocessor scanning");
615 }
616 this.state = state;
617 this.innerScanner = tableAccessWrapper.getScanner(scan);
618 this.maxVersions = maxVersions;
619 this.familyDeletionCache = new HashMap<String, Long>();
620 this.attributeMap = scan.getAttributesMap();
621 }
622
623
624 @Override
625 public Result next() throws IOException {
626 List<Cell> filteredResult = Collections.emptyList();
627 while (filteredResult.isEmpty()) {
628 Result result = innerScanner.next();
629 if (result == null) {
630 return null;
631 }
632 if (!result.isEmpty()) {
633 filteredResult = filterCellsForSnapshot(result.listCells(), state, maxVersions, familyDeletionCache, attributeMap);
634 }
635 }
636 return Result.create(filteredResult);
637 }
638
639
640
641
642 @Override
643 public Result[] next(int nbRows) throws IOException {
644
645 ArrayList<Result> resultSets = new ArrayList<>(nbRows);
646 for (int i = 0; i < nbRows; i++) {
647 Result next = next();
648 if (next != null) {
649 resultSets.add(next);
650 } else {
651 break;
652 }
653 }
654 return resultSets.toArray(new Result[resultSets.size()]);
655 }
656
657 @Override
658 public void close() {
659 innerScanner.close();
660 }
661
662
663
664
665 public ScanMetrics getScanMetrics() {
666 return null;
667 }
668
669
670 public boolean renewLease() {
671 return false;
672 }
673
674 @Override
675 public Iterator<Result> iterator() {
676 return new ResultIterator(this);
677 }
678
679
680
681
682
683 class ResultIterator implements Iterator<Result> {
684
685 TransactionalClientScanner scanner;
686 Result currentResult;
687
688 ResultIterator(TransactionalClientScanner scanner) {
689 try {
690 this.scanner = scanner;
691 currentResult = scanner.next();
692 } catch (IOException e) {
693 throw new RuntimeException(e);
694 }
695 }
696
697 @Override
698 public boolean hasNext() {
699 return currentResult != null && !currentResult.isEmpty();
700 }
701
702 @Override
703 public Result next() {
704 try {
705 Result result = currentResult;
706 currentResult = scanner.next();
707 return result;
708 } catch (IOException e) {
709 throw new RuntimeException(e);
710 }
711 }
712
713 @Override
714 public void remove() {
715 throw new RuntimeException("Not implemented");
716 }
717
718 }
719
720 }
721
722
723 }