View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.slf4j.LoggerFactory.getLogger;
21  import static org.testng.Assert.assertEquals;
22  import static org.testng.Assert.assertNull;
23  import static org.testng.Assert.fail;
24  
25  import java.io.IOException;
26  import java.util.Arrays;
27  
28  import org.apache.hadoop.hbase.client.Delete;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.client.ResultScanner;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.filter.CompareFilter;
34  import org.apache.hadoop.hbase.filter.Filter;
35  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.slf4j.Logger;
38  import org.testng.ITestContext;
39  import org.testng.annotations.BeforeMethod;
40  import org.testng.annotations.Test;
41  
42  /**
43   * These tests try to analyze the transactional anomalies described by P. Baillis et al. in
44   * http://arxiv.org/pdf/1302.0309.pdf
45   *
46   * These tests try to model what project Hermitage is trying to do to compare the behavior of different DBMSs on these
47   * anomalies depending on the different isolation levels they offer. For more info on the Hermitage project, please
48   * refer to: https://github.com/ept/hermitage
49   *
50   * Transactional histories have been translated to HBase from the ones done for Postgresql in the Hermitage project:
51   * https://github.com/ept/hermitage/blob/master/postgres.md
52   *
53   * The "repeatable read" Postgresql isolation level is equivalent to "snapshot isolation", so we include the experiments
54   * for that isolation level
55   *
56   * With HBase 0.98 interfaces is not possible to execute updates/deletes based on predicates so the examples here are
57   * not exactly the same as in Postgres
58   */
59  @Test(groups = "sharedHBase")
60  public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
61  
62      private static final Logger LOG = getLogger(TestBaillisAnomaliesWithTXs.class);
63      private static final String TEST_COLUMN = "baillis-col";
64  
65  
66      // Data used in the tests
67      private byte[] famName = Bytes.toBytes(TEST_FAMILY);
68      private byte[] colName = Bytes.toBytes(TEST_COLUMN);
69  
70      private byte[] rowId1 = Bytes.toBytes("row1");
71      private byte[] rowId2 = Bytes.toBytes("row2");
72      private byte[] rowId3 = Bytes.toBytes("row3");
73  
74      private byte[] dataValue1 = Bytes.toBytes(10);
75      private byte[] dataValue2 = Bytes.toBytes(20);
76      private byte[] dataValue3 = Bytes.toBytes(30);
77  
78  
79      @Test(timeOut = 10_000)
80      public void testSIPreventsPredicateManyPrecedersForReadPredicates(ITestContext context) throws Exception {
81          // TX History for PMP for Read Predicate:
82          // begin; set transaction isolation level repeatable read; -- T1
83          // begin; set transaction isolation level repeatable read; -- T2
84          // select * from test where value = 30; -- T1. Returns nothing
85          // insert into test (id, value) values(3, 30); -- T2
86          // commit; -- T2
87          // select * from test where value % 3 = 0; -- T1. Still returns nothing
88          // commit; -- T1
89  
90          // 0) Start transactions
91          TransactionManager tm = newTransactionManager(context);
92          TTable txTable = new TTable(connection, TEST_TABLE);
93  
94          Transaction tx1 = tm.begin();
95          Transaction tx2 = tm.begin();
96  
97          // 1) select * from test where value = 30; -- T1. Returns nothing
98          Scan scan = new Scan();
99          Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(30));
100         scan.setFilter(f);
101         ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
102         assertNull(tx1Scanner.next());
103 
104         // 2) insert into test (id, value) values(3, 30); -- T2
105         Put newRow = new Put(rowId3);
106         newRow.addColumn(famName, colName, dataValue3);
107         txTable.put(tx2, newRow);
108 
109         // 3) Commit TX 2
110         tm.commit(tx2);
111 
112         // 4) select * from test where value % 3 = 0; -- T1. Still returns nothing
113         tx1Scanner = txTable.getScanner(tx1, scan);
114         assertNull(tx1Scanner.next());
115 
116         // 5) Commit TX 1
117         tm.commit(tx1);
118     }
119 
120     @Test(timeOut = 10_000)
121     public void testSIPreventsPredicateManyPrecedersForWritePredicates(ITestContext context) throws Exception {
122         // TX History for PMP for Write Predicate:
123         // begin; set transaction isolation level repeatable read; -- T1
124         // begin; set transaction isolation level repeatable read; -- T2
125         // update test set value = value + 10; -- T1
126         // delete from test where value = 20; -- T2, BLOCKS
127         // commit; -- T1. T2 now prints out "ERROR: could not serialize access due to concurrent update"
128         // abort; -- T2. There's nothing else we can do, this transaction has failed
129 
130         // 0) Start transactions
131         TransactionManager tm = newTransactionManager(context);
132         TTable txTable = new TTable(connection, TEST_TABLE);
133         Transaction tx1 = tm.begin();
134         Transaction tx2 = tm.begin();
135 
136         // 1) update test set value = value + 10; -- T1
137         Scan updateScan = new Scan();
138         ResultScanner tx1Scanner = txTable.getScanner(tx2, updateScan);
139         Result updateRes = tx1Scanner.next();
140         int count = 0;
141         while (updateRes != null) {
142             LOG.info("RESSS {}", updateRes);
143             Put row = new Put(updateRes.getRow());
144             int val = Bytes.toInt(updateRes.getValue(famName, colName));
145             LOG.info("Updating row id {} with value {}", Bytes.toString(updateRes.getRow()), val);
146             row.addColumn(famName, colName, Bytes.toBytes(val + 10));
147             txTable.put(tx1, row);
148             updateRes = tx1Scanner.next();
149             count++;
150         }
151         assertEquals(count, 2);
152 
153         // 2) delete from test where value = 20; -- T2, BLOCKS
154         Scan scan = new Scan();
155         Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
156         scan.setFilter(f);
157         ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
158         // assertEquals(tx2Scanner.next(100).length, 1);
159         Result res = tx2Scanner.next();
160         int count20 = 0;
161         while (res != null) {
162             LOG.info("RESSS {}", res);
163             LOG.info("Deleting row id {} with value {}", Bytes.toString(res.getRow()),
164                      Bytes.toInt(res.getValue(famName, colName)));
165             Delete delete20 = new Delete(res.getRow());
166             txTable.delete(tx2, delete20);
167             res = tx2Scanner.next();
168             count20++;
169         }
170         assertEquals(count20, 1);
171         // 3) commit TX 1
172         try {
173             tm.commit(tx1);
174         } catch (RollbackException e) {
175             if (!getClient(context).isLowLatency())
176                 fail();
177         }
178 
179         tx2Scanner = txTable.getScanner(tx2, scan);
180         //If we are in low latency mode, tx1 aborted and deleted the val=30, so scan will return row2
181         if (!getClient(context).isLowLatency())
182             assertNull(tx2Scanner.next());
183 
184         // 4) commit TX 2 -> Should be rolled-back
185         try {
186             tm.commit(tx2);
187             fail();
188         } catch (RollbackException e) {
189             // Expected
190         }
191 
192     }
193 
194     @Test(timeOut = 10_000)
195     public void testSIPreventsLostUpdates(ITestContext context) throws Exception {
196         // TX History for P4:
197         // begin; set transaction isolation level repeatable read; -- T1
198         // begin; set transaction isolation level repeatable read; -- T2
199         // select * from test where id = 1; -- T1
200         // select * from test where id = 1; -- T2
201         // update test set value = 11 where id = 1; -- T1
202         // update test set value = 11 where id = 1; -- T2, BLOCKS
203         // commit; -- T1. T2 now prints out "ERROR: could not serialize access due to concurrent update"
204         // abort;  -- T2. There's nothing else we can do, this transaction has failed
205 
206         // 0) Start transactions
207         TransactionManager tm = newTransactionManager(context);
208         TTable txTable = new TTable(connection, TEST_TABLE);
209         Transaction tx1 = tm.begin();
210         Transaction tx2 = tm.begin();
211 
212         Scan scan = new Scan(rowId1, rowId1);
213         scan.addColumn(famName, colName);
214 
215         // 1) select * from test where id = 1; -- T1
216         ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
217         Result res = tx1Scanner.next();
218         int count = 0;
219         while (res != null) {
220             LOG.info("RESSS {}", res);
221             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
222                      Bytes.toString(res.getValue(famName, colName)));
223             assertEquals(res.getRow(), rowId1);
224             assertEquals(res.getValue(famName, colName), dataValue1);
225             res = tx1Scanner.next();
226             count++;
227         }
228         assertEquals(count, 1);
229 
230         // 2) select * from test where id = 1; -- T2
231         ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
232         res = tx2Scanner.next();
233         count = 0;
234         while (res != null) {
235             LOG.info("RESSS {}", res);
236             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
237                      Bytes.toString(res.getValue(famName, colName)));
238             assertEquals(res.getRow(), rowId1);
239             assertEquals(res.getValue(famName, colName), dataValue1);
240             res = tx2Scanner.next();
241             count++;
242         }
243         assertEquals(count, 1);
244 
245         // 3) update test set value = 11 where id = 1; -- T1
246         Put updateRow1Tx1 = new Put(rowId1);
247         updateRow1Tx1.addColumn(famName, colName, Bytes.toBytes("11"));
248         txTable.put(tx1, updateRow1Tx1);
249 
250         // 4) update test set value = 11 where id = 1; -- T2
251         Put updateRow1Tx2 = new Put(rowId1);
252         updateRow1Tx2.addColumn(famName, colName, Bytes.toBytes("11"));
253         txTable.put(tx2, updateRow1Tx2);
254 
255         // 5) commit -- T1
256         tm.commit(tx1);
257 
258         // 6) commit -- T2 --> should be rolled-back
259         try {
260             tm.commit(tx2);
261             fail();
262         } catch (RollbackException e) {
263             // Expected
264         }
265 
266     }
267 
268     @Test(timeOut = 10_000)
269     public void testSIPreventsReadSkew(ITestContext context) throws Exception {
270         // TX History for G-single:
271         // begin; set transaction isolation level repeatable read; -- T1
272         // begin; set transaction isolation level repeatable read; -- T2
273         // select * from test where id = 1; -- T1. Shows 1 => 10
274         // select * from test where id = 1; -- T2
275         // select * from test where id = 2; -- T2
276         // update test set value = 12 where id = 1; -- T2
277         // update test set value = 18 where id = 2; -- T2
278         // commit; -- T2
279         // select * from test where id = 2; -- T1. Shows 2 => 20
280         // commit; -- T1
281 
282         // 0) Start transactions
283         TransactionManager tm = newTransactionManager(context);
284         TTable txTable = new TTable(connection, TEST_TABLE);
285         Transaction tx1 = tm.begin();
286         Transaction tx2 = tm.begin();
287 
288         Scan rowId1Scan = new Scan(rowId1, rowId1);
289         rowId1Scan.addColumn(famName, colName);
290 
291         // 1) select * from test where id = 1; -- T1. Shows 1 => 10
292         ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId1Scan);
293         Result res = tx1Scanner.next();
294         int count = 0;
295         while (res != null) {
296             LOG.info("RESSS {}", res);
297             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
298                      Bytes.toString(res.getValue(famName, colName)));
299             assertEquals(res.getRow(), rowId1);
300             assertEquals(res.getValue(famName, colName), dataValue1);
301             res = tx1Scanner.next();
302             count++;
303         }
304         assertEquals(count, 1);
305 
306         // 2) select * from test where id = 1; -- T2
307         ResultScanner tx2Scanner = txTable.getScanner(tx2, rowId1Scan);
308         res = tx2Scanner.next();
309         count = 0;
310         while (res != null) {
311             LOG.info("RESSS {}", res);
312             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
313                      Bytes.toString(res.getValue(famName, colName)));
314             assertEquals(res.getRow(), rowId1);
315             assertEquals(res.getValue(famName, colName), dataValue1);
316             res = tx2Scanner.next();
317             count++;
318         }
319 
320         Scan rowId2Scan = new Scan(rowId2, rowId2);
321         rowId2Scan.addColumn(famName, colName);
322 
323         // 3) select * from test where id = 2; -- T2
324         tx2Scanner = txTable.getScanner(tx2, rowId2Scan);
325         res = tx2Scanner.next();
326         count = 0;
327         while (res != null) {
328             LOG.info("RESSS {}", res);
329             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
330                      Bytes.toString(res.getValue(famName, colName)));
331             assertEquals(res.getRow(), rowId2);
332             assertEquals(res.getValue(famName, colName), dataValue2);
333             res = tx2Scanner.next();
334             count++;
335         }
336 
337         // 4) update test set value = 12 where id = 1; -- T2
338         Put updateRow1Tx2 = new Put(rowId1);
339         updateRow1Tx2.addColumn(famName, colName, Bytes.toBytes("12"));
340         txTable.put(tx1, updateRow1Tx2);
341 
342         // 5) update test set value = 18 where id = 1; -- T2
343         Put updateRow2Tx2 = new Put(rowId2);
344         updateRow2Tx2.addColumn(famName, colName, Bytes.toBytes("18"));
345         txTable.put(tx2, updateRow2Tx2);
346 
347         // 6) commit -- T2
348         tm.commit(tx2);
349 
350         // 7) select * from test where id = 2; -- T1. Shows 2 => 20
351         tx1Scanner = txTable.getScanner(tx1, rowId2Scan);
352         res = tx1Scanner.next();
353         count = 0;
354         while (res != null) {
355             LOG.info("RESSS {}", res);
356             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
357                      Bytes.toString(res.getValue(famName, colName)));
358             assertEquals(res.getRow(), rowId2);
359             assertEquals(res.getValue(famName, colName), dataValue2);
360             res = tx1Scanner.next();
361             count++;
362         }
363 
364         // 8) commit -- T1
365         tm.commit(tx1);
366 
367     }
368 
369     @Test(timeOut = 10_000)
370     public void testSIPreventsReadSkewUsingWritePredicate(ITestContext context) throws Exception {
371         // TX History for G-single:
372         // begin; set transaction isolation level repeatable read; -- T1
373         // begin; set transaction isolation level repeatable read; -- T2
374         // select * from test where id = 1; -- T1. Shows 1 => 10
375         // select * from test; -- T2
376         // update test set value = 12 where id = 1; -- T2
377         // update test set value = 18 where id = 2; -- T2
378         // commit; -- T2
379         // delete from test where value = 20; -- T1. Prints "ERROR: could not serialize access due to concurrent update"
380         // abort; -- T1. There's nothing else we can do, this transaction has failed
381 
382         // 0) Start transactions
383         TransactionManager tm = newTransactionManager(context);
384         TTable txTable = new TTable(connection, TEST_TABLE);
385         Transaction tx1 = tm.begin();
386         Transaction tx2 = tm.begin();
387 
388         // 1) select * from test; -- T1
389         assertNumberOfRows(txTable, tx1, 2, new Scan());
390 
391         // 2) select * from test; -- T2
392         assertNumberOfRows(txTable, tx2, 2, new Scan());
393 
394         // 3) update test set value = 12 where id = 1; -- T2
395         // 4) update test set value = 18 where id = 2; -- T2
396         Put updateRow1Tx2 = new Put(rowId1);
397         updateRow1Tx2.addColumn(famName, colName, Bytes.toBytes(12));
398         Put updateRow2Tx2 = new Put(rowId2);
399         updateRow2Tx2.addColumn(famName, colName, Bytes.toBytes(18));
400         txTable.put(tx2, Arrays.asList(updateRow1Tx2, updateRow2Tx2));
401 
402         // 5) commit; -- T2
403         tm.commit(tx2);
404 
405         // 6) delete from test where value = 20; -- T1. Prints
406         // "ERROR: could not serialize access due to concurrent update"
407         Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
408         Scan checkFor20 = new Scan();
409         checkFor20.setFilter(f);
410         ResultScanner checkFor20Scanner = txTable.getScanner(tx1, checkFor20);
411         Result res = checkFor20Scanner.next();
412         while (res != null) {
413             LOG.info("RESSS {}", res);
414             LOG.info("Deleting row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
415             Delete delete20 = new Delete(res.getRow());
416             txTable.delete(tx1, delete20);
417             res = checkFor20Scanner.next();
418         }
419 
420         // 7) abort; -- T1
421         try {
422             tm.commit(tx1);
423             fail("Should be aborted");
424         } catch (RollbackException e) {
425             // Expected
426         }
427 
428     }
429 
430     // this test shows that Omid does not provide serilizable level of isolation other wise last commit would have failed
431     @Test(timeOut = 10_000)
432     public void testSIDoesNotPreventWriteSkew(ITestContext context) throws Exception {
433         // TX History for G2-item:
434         // begin; set transaction isolation level repeatable read; -- T1
435         // begin; set transaction isolation level repeatable read; -- T2
436         // select * from test where id in (1,2); -- T1
437         // select * from test where id in (1,2); -- T2
438         // update test set value = 11 where id = 1; -- T1
439         // update test set value = 21 where id = 2; -- T2
440         // commit; -- T1
441         // commit; -- T2
442 
443         // 0) Start transactions
444         TransactionManager tm = newTransactionManager(context);
445         TTable txTable = new TTable(connection, TEST_TABLE);
446         Transaction tx1 = tm.begin();
447         Transaction tx2 = tm.begin();
448 
449         Scan rowId12Scan = new Scan(rowId1, rowId3);
450         rowId12Scan.addColumn(famName, colName);
451 
452         // 1) select * from test where id in (1,2); -- T1
453         ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId12Scan);
454         Result res = tx1Scanner.next();
455         int count = 0;
456         while (res != null) {
457             LOG.info("RESSS {}", res);
458             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
459             switch (count) {
460                 case 0:
461                     assertEquals(res.getRow(), rowId1);
462                     assertEquals(res.getValue(famName, colName), dataValue1);
463                     break;
464                 case 1:
465                     assertEquals(res.getRow(), rowId2);
466                     assertEquals(res.getValue(famName, colName), dataValue2);
467                     break;
468                 default:
469                     fail();
470             }
471             res = tx1Scanner.next();
472             count++;
473         }
474         assertEquals(count, 2);
475 
476         // 2) select * from test where id in (1,2); -- T2
477         ResultScanner tx2Scanner = txTable.getScanner(tx1, rowId12Scan);
478         res = tx2Scanner.next();
479         count = 0;
480         while (res != null) {
481             LOG.info("RESSS {}", res);
482             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
483             switch (count) {
484                 case 0:
485                     assertEquals(res.getRow(), rowId1);
486                     assertEquals(res.getValue(famName, colName), dataValue1);
487                     break;
488                 case 1:
489                     assertEquals(res.getRow(), rowId2);
490                     assertEquals(res.getValue(famName, colName), dataValue2);
491                     break;
492                 default:
493                     fail();
494             }
495             res = tx2Scanner.next();
496             count++;
497         }
498         assertEquals(count, 2);
499 
500         // 3) update test set value = 11 where id = 1; -- T1
501         Put updateRow1Tx1 = new Put(rowId1);
502         updateRow1Tx1.addColumn(famName, colName, Bytes.toBytes("11"));
503         txTable.put(tx1, updateRow1Tx1);
504 
505         // 4) update test set value = 21 where id = 2; -- T2
506         Put updateRow2Tx2 = new Put(rowId2);
507         updateRow2Tx2.addColumn(famName, colName, Bytes.toBytes("21"));
508         txTable.put(tx2, updateRow2Tx2);
509 
510         // 5) commit; -- T1
511         tm.commit(tx1);
512 
513         // 6) commit; -- T2
514         tm.commit(tx2);
515     }
516 
517     // this test shows that Omid does not provide serilizable level of isolation other wise last commit would have failed
518     @Test(timeOut = 10_000)
519     public void testSIDoesNotPreventAntiDependencyCycles(ITestContext context) throws Exception {
520         // TX History for G2:
521         // begin; set transaction isolation level repeatable read; -- T1
522         // begin; set transaction isolation level repeatable read; -- T2
523         // select * from test where value % 3 = 0; -- T1
524         // select * from test where value % 3 = 0; -- T2
525         // insert into test (id, value) values(3, 30); -- T1
526         // insert into test (id, value) values(4, 42); -- T2
527         // commit; -- T1
528         // commit; -- T2
529         // select * from test where value % 3 = 0; -- Either. Returns 3 => 30, 4 => 42
530 
531         // 0) Start transactions
532         TransactionManager tm = newTransactionManager(context);
533         TTable txTable = new TTable(connection, TEST_TABLE);
534         Transaction tx1 = tm.begin();
535         Transaction tx2 = tm.begin();
536 
537         Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes("30"));
538         Scan value30 = new Scan();
539         value30.setFilter(f);
540         value30.addColumn(famName, colName);
541 
542         // 1) select * from test where value % 3 = 0; -- T1
543         assertNumberOfRows(txTable, tx1, 0, value30);
544 
545 
546         // 2) select * from test where value % 3 = 0; -- T2
547         assertNumberOfRows(txTable, tx2, 0, value30);
548 
549 
550         // 3) insert into test (id, value) values(3, 30); -- T1
551         Put insertRow3Tx1 = new Put(rowId1);
552         insertRow3Tx1.addColumn(famName, colName, Bytes.toBytes("30"));
553         txTable.put(tx1, insertRow3Tx1);
554 
555         // 4) insert into test (id, value) values(4, 42); -- T2
556         Put updateRow4Tx2 = new Put(rowId2);
557         updateRow4Tx2.addColumn(famName, colName, Bytes.toBytes("42"));
558         txTable.put(tx2, updateRow4Tx2);
559 
560         // 5) commit; -- T1
561         tm.commit(tx1);
562 
563         // 6) commit; -- T2
564         tm.commit(tx2);
565 
566         // 7) select * from test where value % 3 = 0; -- Either. Returns 3 => 30, 4 => 42
567     }
568 
569     /**
570      * This translates the table initialization done in:
571      * https://github.com/ept/hermitage/blob/master/postgres.md
572      *
573      * create table test (id int primary key, value int);
574      * insert into test (id, value) values (1, 10), (2, 20);
575      */
576     @BeforeMethod(alwaysRun = true)
577     private void loadBaseDataOnTestTable(ITestContext context) throws Exception {
578 
579         TransactionManager tm = newTransactionManager(context);
580         TTable txTable = new TTable(connection, TEST_TABLE);
581 
582         Transaction initializationTx = tm.begin();
583         Put row1 = new Put(rowId1);
584         row1.addColumn(famName, colName, dataValue1);
585         txTable.put(initializationTx, row1);
586         Put row2 = new Put(rowId2);
587         row2.addColumn(famName, colName, dataValue2);
588         txTable.put(initializationTx, row2);
589 
590         tm.commit(initializationTx);
591     }
592 
593 
594     private void assertNumberOfRows(TTable txTable, Transaction tx2, int maxCount, Scan scan) throws IOException {
595         int count = 0;
596         ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
597         Result res = tx2Scanner.next();
598         while (res != null) {
599             LOG.info("RESSS {}", res);
600             LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
601             res = tx2Scanner.next();
602             count++;
603         }
604         assertEquals(count, maxCount);
605     }
606 
607 
608 }