View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.CACHE;
21  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
22  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.NOT_PRESENT;
23  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.concurrent.ExecutionException;
34  
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.client.Get;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.omid.committable.CommitTable;
46  import org.apache.omid.committable.CommitTable.CommitTimestamp;
47  import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
48  import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  import org.apache.phoenix.thirdparty.com.google.common.base.Function;
53  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
54  import org.apache.phoenix.thirdparty.com.google.common.base.Predicate;
55  import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
56  import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
57  import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
58  import org.apache.phoenix.thirdparty.com.google.common.collect.Multimaps;
59  
60  public class SnapshotFilterImpl implements SnapshotFilter {
61  
62      private static Logger LOG = LoggerFactory.getLogger(SnapshotFilterImpl.class);
63  
64      private TableAccessWrapper tableAccessWrapper;
65  
66      private CommitTable.Client commitTableClient;
67  
68      public TableAccessWrapper getTableAccessWrapper() {
69          return tableAccessWrapper;
70      }
71  
72      public SnapshotFilterImpl(TableAccessWrapper tableAccessWrapper, CommitTable.Client commitTableClient) throws IOException {
73          this.tableAccessWrapper = tableAccessWrapper;
74          this.commitTableClient = commitTableClient;
75      }
76  
77      public SnapshotFilterImpl(TableAccessWrapper tableAccessWrapper) throws IOException {
78          this(tableAccessWrapper, null);
79      }
80  
81      public SnapshotFilterImpl(CommitTable.Client commitTableClient) throws IOException {
82          this(null, commitTableClient);
83      }
84  
85      void setTableAccessWrapper(TableAccessWrapper tableAccessWrapper) {
86          this.tableAccessWrapper = tableAccessWrapper;
87      }
88  
89      void setCommitTableClient(CommitTable.Client commitTableClient) {
90          this.commitTableClient = commitTableClient;
91      }
92  
93      private String getRowFamilyString(Cell cell) {
94          return Bytes.toString((CellUtil.cloneRow(cell))) + ":" + Bytes.toString(CellUtil.cloneFamily(cell));
95      }
96  
97      /**
98       * Check whether a cell was deleted using family deletion marker
99       *
100      * @param cell                The cell to check
101      * @param transaction         Defines the current snapshot
102      * @param familyDeletionCache Accumulates the family deletion markers to identify cells that deleted with a higher version
103      * @param commitCache         Holds shadow cells information
104      * @return Whether the cell was deleted
105      */
106     private boolean checkFamilyDeletionCache(Cell cell, HBaseTransaction transaction, Map<String, Long> familyDeletionCache, Map<Long, Long> commitCache) throws IOException {
107         String key = getRowFamilyString(cell);
108         Long familyDeletionCommitTimestamp = familyDeletionCache.get(key);
109         if (familyDeletionCommitTimestamp != null && familyDeletionCommitTimestamp >= cell.getTimestamp()) {
110             return true;
111         }
112         return false;
113     }
114 
115     private void healShadowCell(Cell cell, long commitTimestamp) {
116         Put put = new Put(CellUtil.cloneRow(cell));
117         byte[] family = CellUtil.cloneFamily(cell);
118         byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
119                                                                    cell.getQualifierOffset(),
120                                                                    cell.getQualifierLength());
121         put.addColumn(family, shadowCellQualifier, cell.getTimestamp(), Bytes.toBytes(commitTimestamp));
122         try {
123             tableAccessWrapper.put(put);
124         } catch (IOException e) {
125             LOG.warn("Failed healing shadow cell for kv {}", cell, e);
126         }
127     }
128 
129     /**
130      * Check if the transaction commit data is in the shadow cell
131      * @param cellStartTimestamp
132      *            the transaction start timestamp
133      *        locator
134      *            the timestamp locator
135      * @throws IOException
136      */
137     public Optional<CommitTimestamp> readCommitTimestampFromShadowCell(long cellStartTimestamp, CommitTimestampLocator locator)
138             throws IOException
139     {
140 
141         Optional<CommitTimestamp> commitTS = Optional.absent();
142 
143         Optional<Long> commitTimestamp = locator.readCommitTimestampFromShadowCell(cellStartTimestamp);
144         if (commitTimestamp.isPresent()) {
145             commitTS = Optional.of(new CommitTimestamp(SHADOW_CELL, commitTimestamp.get(), true)); // Valid commit TS
146         }
147 
148         return commitTS;
149     }
150 
151     /**
152      * This function returns the commit timestamp for a particular cell if the transaction was already committed in
153      * the system. In case the transaction was not committed and the cell was written by transaction initialized by a
154      * previous TSO server, an invalidation try occurs.
155      * Otherwise the function returns a value that indicates that the commit timestamp was not found.
156      * @param cellStartTimestamp
157      *          start timestamp of the cell to locate the commit timestamp for.
158      * @param epoch
159      *          the epoch of the TSO server the current tso client is working with.
160      * @param locator
161      *          a locator to find the commit timestamp in the system.
162      * @return the commit timestamp joint with the location where it was found
163      *         or an object indicating that it was not found in the system
164      * @throws IOException  in case of any I/O issues
165      */
166     public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long epoch,
167                                                      CommitTimestampLocator locator, boolean isLowLatency) throws IOException {
168 
169         try {
170             // 1) First check the cache
171             Optional<Long> commitTimestamp = locator.readCommitTimestampFromCache(cellStartTimestamp);
172             if (commitTimestamp.isPresent()) { // Valid commit timestamp
173                 return new CommitTimestamp(CACHE, commitTimestamp.get(), true);
174             }
175 
176             // 2) Then check the commit table
177             // If the data was written at a previous epoch, check whether the transaction was invalidated
178             boolean invalidatedByOther = false;
179             Optional<CommitTimestamp> commitTimestampFromCT = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
180             if (commitTimestampFromCT.isPresent()) {
181                 if (isLowLatency && !commitTimestampFromCT.get().isValid())
182                     invalidatedByOther = true;
183                 else
184                     return commitTimestampFromCT.get();
185             }
186 
187             // 3) Read from shadow cell
188             Optional<CommitTimestamp> commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
189             if (commitTimeStamp.isPresent()) {
190                 return commitTimeStamp.get();
191             }
192 
193             // In case of LL, if found invalid ct cell, still must check sc in stage 3 then return
194             if (invalidatedByOther) {
195                 assert(!commitTimestampFromCT.get().isValid());
196                 return commitTimestampFromCT.get();
197             }
198 
199             // 4) Check the epoch and invalidate the entry
200             // if the data was written by a transaction from a previous epoch (previous TSO)
201             if (cellStartTimestamp < epoch || isLowLatency) {
202                 boolean invalidated = commitTableClient.tryInvalidateTransaction(cellStartTimestamp).get();
203                 if (invalidated) { // Invalid commit timestamp
204 
205                     // If we are running lowLatency Omid, we could have manged to invalidate a ct entry,
206                     // but the committing client already wrote to shadow cells:
207                     if (isLowLatency) {
208                         commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
209                         if (commitTimeStamp.isPresent()) {
210                             // Remove false invalidation from commit table
211                             commitTableClient.deleteCommitEntry(cellStartTimestamp);
212                             return commitTimeStamp.get();
213                         }
214                     }
215 
216                     return new CommitTimestamp(COMMIT_TABLE, CommitTable.INVALID_TRANSACTION_MARKER, false);
217                 }
218             }
219 
220             // 5) We did not manage to invalidate the transactions then check the commit table
221             commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
222             if (commitTimeStamp.isPresent()) {
223                 return commitTimeStamp.get();
224             }
225 
226             // 6) Read from shadow cell
227             commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
228             if (commitTimeStamp.isPresent()) {
229                 return commitTimeStamp.get();
230             }
231 
232             // *) Otherwise return not found
233             return new CommitTimestamp(NOT_PRESENT, -1L /** TODO Check if we should return this */, true);
234         } catch (InterruptedException e) {
235             Thread.currentThread().interrupt();
236             throw new IOException("Interrupted while finding commit timestamp", e);
237         } catch (ExecutionException e) {
238             throw new IOException("Problem finding commit timestamp", e);
239         }
240 
241     }
242 
243     public Optional<Long> tryToLocateCellCommitTimestamp(long epoch,
244                                                          Cell cell,
245                                                          Map<Long, Long> commitCache,
246                                                          boolean isLowLatency)
247                     throws IOException {
248 
249         CommitTimestamp tentativeCommitTimestamp =
250                 locateCellCommitTimestamp(
251                         cell.getTimestamp(),
252                         epoch,
253                         new CommitTimestampLocatorImpl(
254                                 new HBaseCellId(null,
255                                         CellUtil.cloneRow(cell),
256                                         CellUtil.cloneFamily(cell),
257                                         CellUtil.cloneQualifier(cell),
258                                         cell.getTimestamp()),
259                                         commitCache,
260                                         tableAccessWrapper),
261                         isLowLatency);
262 
263         // If transaction that added the cell was invalidated
264         if (!tentativeCommitTimestamp.isValid()) {
265             return Optional.absent();
266         }
267 
268         switch (tentativeCommitTimestamp.getLocation()) {
269         case COMMIT_TABLE:
270             // If the commit timestamp is found in the persisted commit table,
271             // that means the writing process of the shadow cell in the post
272             // commit phase of the client probably failed, so we heal the shadow
273             // cell with the right commit timestamp for avoiding further reads to
274             // hit the storage
275             healShadowCell(cell, tentativeCommitTimestamp.getValue());
276             return Optional.of(tentativeCommitTimestamp.getValue());
277         case CACHE:
278         case SHADOW_CELL:
279             return Optional.of(tentativeCommitTimestamp.getValue());
280         case NOT_PRESENT:
281             return Optional.absent();
282         default:
283             assert (false);
284             return Optional.absent();
285         }
286     }
287 
288 
289     private Optional<Long> getCommitTimestamp(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache)
290             throws IOException {
291 
292         long startTimestamp = transaction.getStartTimestamp();
293 
294         if (kv.getTimestamp() == startTimestamp) {
295             return Optional.of(startTimestamp);
296         }
297 
298         if (commitTableClient == null) {
299             assert (transaction.getTransactionManager() != null);
300             commitTableClient = transaction.getTransactionManager().getCommitTableClient();
301         }
302 
303         return tryToLocateCellCommitTimestamp(transaction.getEpoch(), kv,
304                 commitCache, transaction.isLowLatency());
305     }
306     
307     private Map<Long, Long> buildCommitCache(List<Cell> rawCells) {
308 
309         Map<Long, Long> commitCache = new HashMap<>();
310 
311         for (Cell cell : rawCells) {
312             if (CellUtils.isShadowCell(cell)) {
313                 commitCache.put(cell.getTimestamp(), Bytes.toLong(CellUtil.cloneValue(cell)));
314             }
315         }
316 
317         return commitCache;
318     }
319 
320 
321     private void buildFamilyDeletionCache(HBaseTransaction transaction, List<Cell> rawCells, Map<String, Long> familyDeletionCache, Map<Long, Long> commitCache, Map<String,byte[]> attributeMap) throws IOException {
322         for (Cell cell : rawCells) {
323             if (CellUtils.isFamilyDeleteCell(cell)) {
324                 String key = getRowFamilyString(cell);
325 
326                 if (familyDeletionCache.containsKey(key))
327                     return;
328 
329                 Optional<Long> commitTimeStamp = getTSIfInTransaction(cell, transaction);
330 
331                 if (!commitTimeStamp.isPresent()) {
332                     commitTimeStamp = getTSIfInSnapshot(cell, transaction, commitCache);
333                 }
334 
335                 if (commitTimeStamp.isPresent()) {
336                     familyDeletionCache.put(key, commitTimeStamp.get());
337                 } else {
338                     Cell lastCell = cell;
339                     Map<Long, Long> cmtCache;
340                     boolean foundCommittedFamilyDeletion = false;
341                     while (!foundCommittedFamilyDeletion) {
342 
343                         Get g = createPendingGet(lastCell, 3);
344 
345                         Result result = tableAccessWrapper.get(g);
346                         List<Cell> resultCells = result.listCells();
347                         if (resultCells == null) {
348                             break;
349                         }
350 
351                         cmtCache = buildCommitCache(resultCells);
352                         for (Cell c : resultCells) {
353                             if (CellUtils.isFamilyDeleteCell(c)) {
354                                     commitTimeStamp = getTSIfInSnapshot(c, transaction, cmtCache);
355                                     if (commitTimeStamp.isPresent()) {
356                                         familyDeletionCache.put(key, commitTimeStamp.get());
357                                         foundCommittedFamilyDeletion = true;
358                                         break;
359                                     }
360                                     lastCell = c;
361                             }
362                         }
363                     }
364                 }
365             }
366         }
367     }
368 
369 
370     public Optional<Long> getTSIfInTransaction(Cell kv, HBaseTransaction transaction) {
371         long startTimestamp = transaction.getStartTimestamp();
372         long readTimestamp = transaction.getReadTimestamp();
373 
374         // A cell was written by a transaction if its timestamp is larger than its startTimestamp and smaller or equal to its readTimestamp.
375         // There also might be a case where the cell was written by the transaction and its timestamp equals to its writeTimestamp, however,
376         // this case occurs after checkpoint and in this case we do not want to read this data.
377         if (kv.getTimestamp() >= startTimestamp && kv.getTimestamp() <= readTimestamp) {
378             return Optional.of(kv.getTimestamp());
379         }
380 
381         return Optional.absent();
382     }
383 
384 
385     public Optional<Long> getTSIfInSnapshot(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache)
386         throws IOException {
387 
388         Optional<Long> commitTimestamp = getCommitTimestamp(kv, transaction, commitCache);
389 
390         if (commitTimestamp.isPresent() && commitTimestamp.get() < transaction.getStartTimestamp())
391             return commitTimestamp;
392 
393         return Optional.absent();
394     }
395 
396     private Get createPendingGet(Cell cell, int versionCount) throws IOException {
397 
398         Get pendingGet = new Get(CellUtil.cloneRow(cell));
399         pendingGet.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
400         pendingGet.addColumn(CellUtil.cloneFamily(cell), CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
401                                                                                        cell.getQualifierOffset(),
402                                                                                        cell.getQualifierLength()));
403         pendingGet.setMaxVersions(versionCount);
404         pendingGet.setTimeRange(0, cell.getTimestamp());
405 
406         return pendingGet;
407     }
408 
409     /**
410      * Filters the raw results returned from HBase and returns only those belonging to the current snapshot, as defined
411      * by the transaction object. If the raw results don't contain enough information for a particular qualifier, it
412      * will request more versions from HBase.
413      *
414      * @param rawCells          Raw cells that we are going to filter
415      * @param transaction       Defines the current snapshot
416      * @param versionsToRequest Number of versions requested from hbase
417      * @param familyDeletionCache Accumulates the family deletion markers to identify cells that deleted with a higher version
418      * @return Filtered KVs belonging to the transaction snapshot
419      */
420     public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
421                                       int versionsToRequest, Map<String, Long> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException {
422 
423         assert (rawCells != null && transaction != null && versionsToRequest >= 1);
424 
425         List<Cell> keyValuesInSnapshot = new ArrayList<>();
426         List<Get> pendingGetsList = new ArrayList<>();
427 
428         int numberOfVersionsToFetch = versionsToRequest * 2;
429         if (numberOfVersionsToFetch < 1) {
430             numberOfVersionsToFetch = versionsToRequest;
431         }
432 
433         Map<Long, Long> commitCache = buildCommitCache(rawCells);
434         buildFamilyDeletionCache(transaction, rawCells, familyDeletionCache, commitCache, attributeMap);
435 
436         ImmutableList<Collection<Cell>> filteredCells;
437         if (transaction.getVisibilityLevel() == VisibilityLevel.SNAPSHOT_ALL) {
438             filteredCells = groupCellsByColumnFilteringShadowCells(rawCells);
439         } else {
440             filteredCells = groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(rawCells);
441         }
442 
443         for (Collection<Cell> columnCells : filteredCells) {
444             boolean snapshotValueFound = false;
445             Cell oldestCell = null;
446             for (Cell cell : columnCells) {
447                 oldestCell = cell;
448                 if (getTSIfInTransaction(cell, transaction).isPresent() ||
449                         getTSIfInSnapshot(cell, transaction, commitCache).isPresent()) {
450 
451                     if (transaction.getVisibilityLevel() == VisibilityLevel.SNAPSHOT_ALL) {
452                         keyValuesInSnapshot.add(cell);
453                         if (getTSIfInTransaction(cell, transaction).isPresent()) {
454                             snapshotValueFound = false;
455                             continue;
456                         } else {
457                             snapshotValueFound = true;
458                             break;
459                         }
460                     } else {
461                         if (!checkFamilyDeletionCache(cell, transaction, familyDeletionCache, commitCache) &&
462                                 !CellUtils.isTombstone(cell)) {
463                             keyValuesInSnapshot.add(cell);
464                         }
465                         snapshotValueFound = true;
466                         break;
467 
468                     }
469                 }
470             }
471             if (!snapshotValueFound) {
472                 assert (oldestCell != null);
473                 Get pendingGet = createPendingGet(oldestCell, numberOfVersionsToFetch);
474                 for (Map.Entry<String,byte[]> entry : attributeMap.entrySet()) {
475                     pendingGet.setAttribute(entry.getKey(), entry.getValue());
476                 }
477                 pendingGetsList.add(pendingGet);
478             }
479         }
480 
481         if (!pendingGetsList.isEmpty()) {
482             Result[] pendingGetsResults = tableAccessWrapper.get(pendingGetsList);
483             for (Result pendingGetResult : pendingGetsResults) {
484                 if (!pendingGetResult.isEmpty()) {
485                     keyValuesInSnapshot.addAll(
486                         filterCellsForSnapshot(pendingGetResult.listCells(), transaction, numberOfVersionsToFetch, familyDeletionCache, attributeMap));
487                 }
488             }
489         }
490 
491         Collections.sort(keyValuesInSnapshot, KeyValue.COMPARATOR);
492 
493         return keyValuesInSnapshot;
494     }
495 
496     @Override
497     public Result get(Get get, HBaseTransaction transaction) throws IOException {
498         Result result = tableAccessWrapper.get(get);
499 
500         List<Cell> filteredKeyValues = Collections.emptyList();
501         if (!result.isEmpty()) {
502             filteredKeyValues = filterCellsForSnapshot(result.listCells(), transaction, get.getMaxVersions(), new HashMap<String, Long>(), get.getAttributesMap());
503         }
504 
505         return Result.create(filteredKeyValues);
506     }
507 
508     @Override
509     public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException {
510 
511         return new TransactionalClientScanner(transaction, scan, 1);
512 
513     }
514 
515     public boolean isCommitted(HBaseCellId hBaseCellId, long epoch, boolean isLowLatency) throws TransactionException {
516         try {
517             long timestamp = hBaseCellId.getTimestamp();
518             CommitTimestamp tentativeCommitTimestamp =
519                     locateCellCommitTimestamp(timestamp,
520                             epoch,
521                             new CommitTimestampLocatorImpl(hBaseCellId,
522                                     Maps.<Long, Long>newHashMap(),
523                                     tableAccessWrapper),
524                             isLowLatency);
525 
526             // If transaction that added the cell was invalidated
527             if (!tentativeCommitTimestamp.isValid()) {
528                 return false;
529             }
530 
531             switch (tentativeCommitTimestamp.getLocation()) {
532                 case COMMIT_TABLE:
533                 case SHADOW_CELL:
534                     return true;
535                 case NOT_PRESENT:
536                     return false;
537                 case CACHE: // cache was empty
538                 default:
539                     return false;
540             }
541         } catch (IOException e) {
542             throw new TransactionException("Failure while checking if a transaction was committed", e);
543         }
544     }
545 
546     static ImmutableList<Collection<Cell>> groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(List<Cell> rawCells) {
547 
548         Predicate<Cell> shadowCellAndFamilyDeletionFilter = new Predicate<Cell>() {
549 
550             @Override
551             public boolean apply(Cell cell) {
552                 boolean familyDeletionMarkerCondition = CellUtils.isFamilyDeleteCell(cell);
553 
554                 return cell != null && !CellUtils.isShadowCell(cell) && !familyDeletionMarkerCondition;
555             }
556 
557         };
558 
559         Function<Cell, ColumnWrapper> cellToColumnWrapper = new Function<Cell, ColumnWrapper>() {
560 
561             @Override
562             public ColumnWrapper apply(Cell cell) {
563                 return new ColumnWrapper(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
564             }
565 
566         };
567 
568         return Multimaps.index(Iterables.filter(rawCells, shadowCellAndFamilyDeletionFilter), cellToColumnWrapper)
569             .asMap().values()
570             .asList();
571     }
572 
573 
574     static ImmutableList<Collection<Cell>> groupCellsByColumnFilteringShadowCells(List<Cell> rawCells) {
575 
576         Predicate<Cell> shadowCellFilter = new Predicate<Cell>() {
577             @Override
578             public boolean apply(Cell cell) {
579                 return cell != null && !CellUtils.isShadowCell(cell);
580             }
581         };
582 
583         Function<Cell, ColumnWrapper> cellToColumnWrapper = new Function<Cell, ColumnWrapper>() {
584 
585             @Override
586             public ColumnWrapper apply(Cell cell) {
587                 return new ColumnWrapper(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
588             }
589 
590         };
591 
592         return Multimaps.index(Iterables.filter(rawCells, shadowCellFilter), cellToColumnWrapper)
593                 .asMap().values()
594                 .asList();
595     }
596 
597     @Override
598     public void close() throws Exception {
599         tableAccessWrapper.close();
600     }
601 
602 
603     public class TransactionalClientScanner implements ResultScanner {
604 
605         private HBaseTransaction state;
606         private ResultScanner innerScanner;
607         private int maxVersions;
608         Map<String, Long> familyDeletionCache;
609         private Map<String,byte[]> attributeMap;
610 
611         TransactionalClientScanner(HBaseTransaction state, Scan scan, int maxVersions)
612                 throws IOException {
613             if (scan.hasFilter()) {
614                 LOG.warn("Client scanner with filter will return un expected results. Use Coprocessor scanning");
615             }
616             this.state = state;
617             this.innerScanner = tableAccessWrapper.getScanner(scan);
618             this.maxVersions = maxVersions;
619             this.familyDeletionCache = new HashMap<String, Long>();
620             this.attributeMap = scan.getAttributesMap();
621         }
622 
623 
624         @Override
625         public Result next() throws IOException {
626             List<Cell> filteredResult = Collections.emptyList();
627             while (filteredResult.isEmpty()) {
628                 Result result = innerScanner.next();
629                 if (result == null) {
630                     return null;
631                 }
632                 if (!result.isEmpty()) {
633                     filteredResult = filterCellsForSnapshot(result.listCells(), state, maxVersions, familyDeletionCache, attributeMap);
634                 }
635             }
636             return Result.create(filteredResult);
637         }
638 
639         // In principle no need to override, copied from super.next(int) to make
640         // sure it works even if super.next(int)
641         // changes its implementation
642         @Override
643         public Result[] next(int nbRows) throws IOException {
644             // Collect values to be returned here
645             ArrayList<Result> resultSets = new ArrayList<>(nbRows);
646             for (int i = 0; i < nbRows; i++) {
647                 Result next = next();
648                 if (next != null) {
649                     resultSets.add(next);
650                 } else {
651                     break;
652                 }
653             }
654             return resultSets.toArray(new Result[resultSets.size()]);
655         }
656 
657         @Override
658         public void close() {
659             innerScanner.close();
660         }
661         
662         // So that Omid works with both HBase 1.3 and 1.4 without needing
663         // a new profile. Since this doesn't existing in 1.3, we don't
664         // add an @Override for it.
665         public ScanMetrics getScanMetrics() {
666             return null;
667         }
668         
669         // Same as above
670         public boolean renewLease() {
671             return false;
672         }
673 
674         @Override
675         public Iterator<Result> iterator() {
676             return new ResultIterator(this);
677         }
678 
679         // ------------------------------------------------------------------------------------------------------------
680         // --------------------------------- Helper class for TransactionalClientScanner ------------------------------
681         // ------------------------------------------------------------------------------------------------------------
682 
683         class ResultIterator implements Iterator<Result> {
684 
685             TransactionalClientScanner scanner;
686             Result currentResult;
687 
688             ResultIterator(TransactionalClientScanner scanner) {
689                 try {
690                     this.scanner = scanner;
691                     currentResult = scanner.next();
692                 } catch (IOException e) {
693                     throw new RuntimeException(e);
694                 }
695             }
696 
697             @Override
698             public boolean hasNext() {
699                 return currentResult != null && !currentResult.isEmpty();
700             }
701 
702             @Override
703             public Result next() {
704                 try {
705                     Result result = currentResult;
706                     currentResult = scanner.next();
707                     return result;
708                 } catch (IOException e) {
709                     throw new RuntimeException(e);
710                 }
711             }
712 
713             @Override
714             public void remove() {
715                 throw new RuntimeException("Not implemented");
716             }
717 
718         }
719 
720     }
721 
722 
723 }