View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.CACHE;
21  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
22  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.NOT_PRESENT;
23  import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
24  import static org.mockito.Matchers.any;
25  import static org.mockito.Mockito.doReturn;
26  import static org.mockito.Mockito.doThrow;
27  import static org.mockito.Mockito.spy;
28  import static org.testng.Assert.assertEquals;
29  import static org.testng.Assert.assertFalse;
30  import static org.testng.Assert.assertTrue;
31  
32  import java.util.Map;
33  
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.Put;
36  import org.apache.hadoop.hbase.client.Table;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.omid.committable.CommitTable;
39  import org.apache.omid.committable.CommitTable.CommitTimestamp;
40  import org.apache.omid.metrics.NullMetricsProvider;
41  import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
42  import org.testng.ITestContext;
43  import org.testng.annotations.Test;
44  
45  import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
46  import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
47  import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
48  
49  @Test(groups = "sharedHBase")
50  public class TestHBaseTransactionClient extends OmidTestBase {
51  
52      private static final byte[] row1 = Bytes.toBytes("test-is-committed1");
53      private static final byte[] row2 = Bytes.toBytes("test-is-committed2");
54      private static final byte[] family = Bytes.toBytes(TEST_FAMILY);
55      private static final byte[] qualifier = Bytes.toBytes("testdata");
56      private static final byte[] data1 = Bytes.toBytes("testWrite-1");
57  
58      @Test(timeOut = 30_000)
59      public void testIsCommitted(ITestContext context) throws Exception {
60          TransactionManager tm = newTransactionManager(context);
61          Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
62          SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
63                  ((AbstractTransactionManager)tm).getCommitTableClient());
64          TTable table = spy(new TTable(htable, snapshotFilter, false));
65  
66          HBaseTransaction t1 = (HBaseTransaction) tm.begin();
67  
68          Put put = new Put(row1);
69          put.addColumn(family, qualifier, data1);
70          table.put(t1, put);
71          tm.commit(t1);
72  
73          HBaseTransaction t2 = (HBaseTransaction) tm.begin();
74          put = new Put(row2);
75          put.addColumn(family, qualifier, data1);
76          table.put(t2, put);
77          table.flushCommits();
78  
79          HBaseTransaction t3 = (HBaseTransaction) tm.begin();
80          put = new Put(row2);
81          put.addColumn(family, qualifier, data1);
82          table.put(t3, put);
83          tm.commit(t3);
84  
85          HBaseCellId hBaseCellId1 = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
86          HBaseCellId hBaseCellId2 = new HBaseCellId(table, row2, family, qualifier, t2.getStartTimestamp());
87          HBaseCellId hBaseCellId3 = new HBaseCellId(table, row2, family, qualifier, t3.getStartTimestamp());
88  
89          assertTrue(snapshotFilter.isCommitted(hBaseCellId1, 0, false), "row1 should be committed");
90          assertFalse(snapshotFilter.isCommitted(hBaseCellId2, 0, false), "row2 should not be committed for kv2");
91          assertTrue(snapshotFilter.isCommitted(hBaseCellId3, 0, false), "row2 should be committed for kv3");
92      }
93  
94      @Test(timeOut = 30_000)
95      public void testCrashAfterCommit(ITestContext context) throws Exception {
96          PostCommitActions syncPostCommitter =
97                  spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
98          AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
99          // The following line emulates a crash after commit that is observed in (*) below
100         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
101 
102         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
103         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
104                 tm.getCommitTableClient());
105         TTable table = spy(new TTable(htable, snapshotFilter, false));
106 
107         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
108 
109         // Test shadow cell are created properly
110         Put put = new Put(row1);
111         put.addColumn(family, qualifier, data1);
112         table.put(t1, put);
113         try {
114             tm.commit(t1);
115         } catch (Exception e) { // (*) crash
116             // Do nothing
117         }
118 
119         assertTrue(CellUtils.hasCell(row1, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
120                    "Cell should be there");
121         assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
122                     "Shadow cell should not be there");
123 
124         HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
125 
126         HBaseTransactionClient hbaseTm = (HBaseTransactionClient) newTransactionManager(context);
127         assertTrue(snapshotFilter.isCommitted(hBaseCellId, 0, false), "row1 should be committed");
128     }
129 
130     @Test(timeOut = 30_000)
131     public void testReadCommitTimestampFromCommitTable(ITestContext context) throws Exception {
132 
133         //connection = ConnectionFactory.createConnection(hbaseConf);
134         final long NON_EXISTING_CELL_TS = 1000L;
135 
136         PostCommitActions syncPostCommitter =
137                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
138         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
139         // The following line emulates a crash after commit that is observed in (*) below
140         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
141 
142         // Test that a non-existing cell timestamp returns an empty result
143         Optional<CommitTimestamp> optionalCT = tm.commitTableClient.getCommitTimestamp(NON_EXISTING_CELL_TS).get();
144         assertFalse(optionalCT.isPresent());
145 
146         try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
147             // Test that we get an invalidation mark for an invalidated transaction
148 
149             // Start a transaction and invalidate it before commiting it
150             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
151             Put put = new Put(row1);
152             put.addColumn(family, qualifier, data1);
153             table.put(tx1, put);
154 
155             assertTrue(tm.commitTableClient.tryInvalidateTransaction(tx1.getStartTimestamp()).get());
156             optionalCT = tm.commitTableClient.getCommitTimestamp(tx1.getStartTimestamp()).get();
157             assertTrue(optionalCT.isPresent());
158             CommitTimestamp ct = optionalCT.get();
159             assertFalse(ct.isValid());
160             assertEquals(ct.getValue(), CommitTable.INVALID_TRANSACTION_MARKER);
161             assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
162 
163             // Finally test that we get the right commit timestamp for a committed tx
164             // that couldn't get
165             HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
166             Put otherPut = new Put(row1);
167             otherPut.addColumn(family, qualifier, data1);
168             table.put(tx2, otherPut);
169             try {
170                 tm.commit(tx2);
171             } catch (Exception e) { // (*) crash
172                 // Do nothing
173             }
174 
175             optionalCT = tm.commitTableClient.getCommitTimestamp(tx2.getStartTimestamp()).get();
176             assertTrue(optionalCT.isPresent());
177             ct = optionalCT.get();
178             assertTrue(ct.isValid());
179             assertEquals(ct.getValue(), tx2.getCommitTimestamp());
180             assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
181         }
182     }
183 
184     @Test(timeOut = 30_000)
185     public void testReadCommitTimestampFromShadowCell(ITestContext context) throws Exception {
186 
187         final long NON_EXISTING_CELL_TS = 1L;
188 
189         HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
190 
191         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
192         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
193                 tm.getCommitTableClient());
194 
195         try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
196 
197             // Test first we can not found a non-existent cell ts
198             HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, NON_EXISTING_CELL_TS);
199             // Set an empty cache to allow to bypass the checking
200             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
201                     Maps.<Long, Long>newHashMap());
202             Optional<CommitTimestamp> optionalCT = snapshotFilter
203                     .readCommitTimestampFromShadowCell(NON_EXISTING_CELL_TS, ctLocator);
204             assertFalse(optionalCT.isPresent());
205 
206             // Then test that for a transaction committed, we get the right CT
207             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
208             Put put = new Put(row1);
209             put.addColumn(family, qualifier, data1);
210             table.put(tx1, put);
211             tm.commit(tx1);
212             // Upon commit, the commit data should be in the shadow cells, so test it
213             optionalCT = snapshotFilter.readCommitTimestampFromShadowCell(tx1.getStartTimestamp(), ctLocator);
214             assertTrue(optionalCT.isPresent());
215             CommitTimestamp ct = optionalCT.get();
216             assertTrue(ct.isValid());
217             assertEquals(ct.getValue(), tx1.getCommitTimestamp());
218             assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
219 
220         }
221 
222     }
223 
224     // Tests step 1 in AbstractTransactionManager.locateCellCommitTimestamp()
225     @Test(timeOut = 30_000)
226     public void testCellCommitTimestampIsLocatedInCache(ITestContext context) throws Exception {
227 
228         final long CELL_ST = 1L;
229         final long CELL_CT = 2L;
230 
231         HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
232 
233         // Pre-load the element to look for in the cache
234         Table htable = hBaseUtils.getConnection().getTable(TableName.valueOf(TEST_TABLE));
235         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
236                 tm.getCommitTableClient());
237         TTable table = new TTable(htable, snapshotFilter, false);
238 
239         HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, CELL_ST);
240         Map<Long, Long> fakeCache = Maps.newHashMap();
241         fakeCache.put(CELL_ST, CELL_CT);
242 
243         // Then test that locator finds it in the cache
244         CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, fakeCache);
245         CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(CELL_ST, tm.tsoClient.getEpoch(), ctLocator,
246                 false);
247         assertTrue(ct.isValid());
248         assertEquals(ct.getValue(), CELL_CT);
249         assertTrue(ct.getLocation().compareTo(CACHE) == 0);
250 
251     }
252 
253     // Tests step 2 in AbstractTransactionManager.locateCellCommitTimestamp()
254     // Note: This test is very similar to testCrashAfterCommit() above so
255     // maybe we should merge them in this test, adding the missing assertions
256     @Test(timeOut = 30_000)
257     public void testCellCommitTimestampIsLocatedInCommitTable(ITestContext context) throws Exception {
258 
259         PostCommitActions syncPostCommitter =
260                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
261         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
262         // The following line emulates a crash after commit that is observed in (*) below
263         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
264 
265         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
266         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
267                 tm.getCommitTableClient());
268 
269         try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
270             // Commit a transaction that is broken on commit to avoid
271             // write to the shadow cells and avoid cleaning the commit table
272             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
273             Put put = new Put(row1);
274             put.addColumn(family, qualifier, data1);
275             table.put(tx1, put);
276             try {
277                 tm.commit(tx1);
278             } catch (Exception e) { // (*) crash
279                 // Do nothing
280             }
281 
282             // Test the locator finds the appropriate data in the commit table
283             HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
284                     tx1.getStartTimestamp());
285             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
286                     Maps.<Long, Long>newHashMap());
287             CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
288                     ctLocator, false);
289             assertTrue(ct.isValid());
290             long expectedCommitTS = tx1.getStartTimestamp() + CommitTable.MAX_CHECKPOINTS_PER_TXN;
291             assertEquals(ct.getValue(), expectedCommitTS);
292             assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
293         }
294 
295     }
296 
297     // Tests step 3 in AbstractTransactionManager.locateCellCommitTimestamp()
298     @Test(timeOut = 30_000)
299     public void testCellCommitTimestampIsLocatedInShadowCells(ITestContext context) throws Exception {
300 
301         HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
302 
303         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
304         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
305                 tm.getCommitTableClient());
306 
307         try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
308             // Commit a transaction to addColumn ST/CT in commit table
309             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
310             Put put = new Put(row1);
311             put.addColumn(family, qualifier, data1);
312             table.put(tx1, put);
313             tm.commit(tx1);
314             // Upon commit, the commit data should be in the shadow cells
315 
316             // Test the locator finds the appropriate data in the shadow cells
317             HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
318                     tx1.getStartTimestamp());
319             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
320                     Maps.<Long, Long>newHashMap());
321             CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
322                     ctLocator, false);
323             assertTrue(ct.isValid());
324             assertEquals(ct.getValue(), tx1.getCommitTimestamp());
325             assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
326         }
327 
328     }
329 
330     // Tests step 4 in AbstractTransactionManager.locateCellCommitTimestamp()
331     @Test(timeOut = 30_000)
332     public void testCellFromTransactionInPreviousEpochGetsInvalidComitTimestamp(ITestContext context) throws Exception {
333 
334         final long CURRENT_EPOCH_FAKE = 1000L * CommitTable.MAX_CHECKPOINTS_PER_TXN;
335 
336         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
337         AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
338         // The following lines allow to reach step 4)
339         // in AbstractTransactionManager.locateCellCommitTimestamp()
340         SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
341         f.set(Optional.<CommitTimestamp>absent());
342         doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
343 
344         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
345         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
346                 tm.getCommitTableClient());
347 
348         try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
349 
350             // Commit a transaction to addColumn ST/CT in commit table
351             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
352             Put put = new Put(row1);
353             put.addColumn(family, qualifier, data1);
354             table.put(tx1, put);
355             // Upon commit, the commit data should be in the shadow cells
356 
357             // Test a transaction in the previous epoch gets an InvalidCommitTimestamp class
358             HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
359                     tx1.getStartTimestamp());
360             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
361                     Maps.<Long, Long>newHashMap());
362             // Fake the current epoch to simulate a newer TSO
363             CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), CURRENT_EPOCH_FAKE,
364                     ctLocator, false);
365             assertFalse(ct.isValid());
366             assertEquals(ct.getValue(), CommitTable.INVALID_TRANSACTION_MARKER);
367             assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
368         }
369     }
370 
371     // Tests step 5 in AbstractTransactionManager.locateCellCommitTimestamp()
372     @Test(timeOut = 30_000)
373     public void testCellCommitTimestampIsLocatedInCommitTableAfterNotBeingInvalidated(ITestContext context) throws Exception {
374 
375         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
376         PostCommitActions syncPostCommitter =
377                 spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
378         AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, syncPostCommitter));
379 
380         // The following line emulates a crash after commit that is observed in (*) below
381         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
382         // The next two lines avoid steps 2) and 3) and go directly to step 5)
383         // in AbstractTransactionManager.locateCellCommitTimestamp()
384         SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
385         f.set(Optional.<CommitTimestamp>absent());
386         doReturn(f).doCallRealMethod().when(commitTableClient).getCommitTimestamp(any(Long.class));
387 
388         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
389         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
390                 tm.getCommitTableClient());
391 
392         try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
393 
394             // Commit a transaction that is broken on commit to avoid
395             // write to the shadow cells and avoid cleaning the commit table
396             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
397             Put put = new Put(row1);
398             put.addColumn(family, qualifier, data1);
399             table.put(tx1, put);
400             try {
401                 tm.commit(tx1);
402             } catch (Exception e) { // (*) crash
403                 // Do nothing
404             }
405 
406             // Test the locator finds the appropriate data in the commit table
407             HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
408                     tx1.getStartTimestamp());
409             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
410                     Maps.<Long, Long>newHashMap());
411             CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
412                     ctLocator, false);
413             assertTrue(ct.isValid());
414             assertEquals(ct.getValue(), tx1.getCommitTimestamp());
415             assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
416         }
417 
418     }
419 
420     // Tests step 6 in AbstractTransactionManager.locateCellCommitTimestamp()
421     @Test(timeOut = 30_000)
422     public void testCellCommitTimestampIsLocatedInShadowCellsAfterNotBeingInvalidated(ITestContext context) throws Exception {
423 
424         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
425         AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
426         // The next two lines avoid steps 2), 3) and 5) and go directly to step 6)
427         // in AbstractTransactionManager.locateCellCommitTimestamp()
428         SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
429         f.set(Optional.<CommitTimestamp>absent());
430         doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
431 
432         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
433         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
434                 tm.getCommitTableClient());
435 
436         try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
437 
438             // Commit a transaction to addColumn ST/CT in commit table
439             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
440             Put put = new Put(row1);
441             put.addColumn(family, qualifier, data1);
442             table.put(tx1, put);
443             tm.commit(tx1);
444             // Upon commit, the commit data should be in the shadow cells
445 
446             // Test the locator finds the appropriate data in the shadow cells
447             HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
448                     tx1.getStartTimestamp());
449             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
450                     Maps.<Long, Long>newHashMap());
451             CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
452                     ctLocator,false);
453             assertTrue(ct.isValid());
454             assertEquals(ct.getValue(), tx1.getCommitTimestamp());
455             assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
456         }
457 
458     }
459 
460     // Tests last step in AbstractTransactionManager.locateCellCommitTimestamp()
461     @Test(timeOut = 30_000)
462     public void testCTLocatorReturnsAValidCTWhenNotPresent(ITestContext context) throws Exception {
463 
464         final long CELL_TS = 1L;
465 
466         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
467         AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
468         // The following lines allow to reach the last return statement
469         SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
470         f.set(Optional.<CommitTimestamp>absent());
471         doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
472 
473         Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
474         SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
475                 tm.getCommitTableClient());
476 
477         try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
478             HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, CELL_TS);
479             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
480                     Maps.<Long, Long>newHashMap());
481             CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(CELL_TS, tm.tsoClient.getEpoch(),
482                     ctLocator, false);
483             assertTrue(ct.isValid());
484             assertEquals(ct.getValue(), -1L);
485             assertTrue(ct.getLocation().compareTo(NOT_PRESENT) == 0);
486         }
487 
488     }
489 }