View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.junit.Assert.fail;
21  import static org.testng.Assert.assertEquals;
22  import static org.testng.Assert.assertTrue;
23  
24  import org.apache.hadoop.hbase.client.Get;
25  import org.apache.hadoop.hbase.client.Put;
26  import org.apache.hadoop.hbase.client.Result;
27  import org.apache.hadoop.hbase.client.ResultScanner;
28  import org.apache.hadoop.hbase.client.Scan;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  import org.testng.ITestContext;
33  import org.testng.annotations.Test;
34  
35  @Test(groups = "sharedHBase")
36  public class TestBasicTransaction extends OmidTestBase {
37  
38      private static final Logger LOG = LoggerFactory.getLogger(TestBasicTransaction.class);
39  
40  
41      @Test(timeOut = 30_000)
42      public void testTimestampsOfTwoRowsInstertedAfterCommitOfSingleTransactionAreEquals(ITestContext context) throws Exception {
43  
44          TransactionManager tm = newTransactionManager(context);
45          TTable tt = new TTable(connection, TEST_TABLE);
46  
47          byte[] rowName1 = Bytes.toBytes("row1");
48          byte[] rowName2 = Bytes.toBytes("row2");
49          byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
50          byte[] colName1 = Bytes.toBytes("col1");
51          byte[] dataValue1 = Bytes.toBytes("testWrite-1");
52          byte[] dataValue2 = Bytes.toBytes("testWrite-2");
53  
54          Transaction tx1 = tm.begin();
55  
56          Put row1 = new Put(rowName1);
57          row1.addColumn(famName1, colName1, dataValue1);
58          tt.put(tx1, row1);
59          Put row2 = new Put(rowName2);
60          row2.addColumn(famName1, colName1, dataValue2);
61          tt.put(tx1, row2);
62  
63          tm.commit(tx1);
64  
65          tt.close();
66  
67          // Checks
68          Get getResultRow1 = new Get(rowName1).setMaxVersions(1);
69          Result result1 = tt.getHTable().get(getResultRow1);
70          byte[] val1 = result1.getValue(famName1, colName1);
71          assertTrue(Bytes.equals(dataValue1, result1.getValue(famName1, colName1)),
72                  "Unexpected value for row 1 in col 1: " + Bytes.toString(val1));
73          long tsRow1 = result1.rawCells()[0].getTimestamp();
74  
75          Get getResultRow2 = new Get(rowName2).setMaxVersions(1);
76          Result result2 = tt.getHTable().get(getResultRow2);
77          byte[] val2 = result2.getValue(famName1, colName1);
78          assertTrue(Bytes.equals(dataValue2, result2.getValue(famName1, colName1)),
79                  "Unexpected value for row 2 in col 1: " + Bytes.toString(val2));
80          long tsRow2 = result2.rawCells()[0].getTimestamp();
81  
82          assertEquals(tsRow2, tsRow1, "Timestamps of row 1 and row 2 are different");
83  
84      }
85  
86      @Test(timeOut = 30_000)
87      public void testTimestampsOfTwoRowsModifiedByTwoSequentialTransactionsAreEqualAndHaveBeenIncreasedMonotonically(ITestContext context)
88              throws Exception {
89  
90          TransactionManager tm = newTransactionManager(context);
91          TTable tt = new TTable(connection, TEST_TABLE);
92  
93          byte[] rowName1 = Bytes.toBytes("row1");
94          byte[] rowName2 = Bytes.toBytes("row2");
95          byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
96          byte[] colName1 = Bytes.toBytes("col1");
97          byte[] dataValue1 = Bytes.toBytes("testWrite-1");
98          byte[] dataValue2 = Bytes.toBytes("testWrite-2");
99  
100         byte[] dataValue3 = Bytes.toBytes("testWrite-3");
101         byte[] dataValue4 = Bytes.toBytes("testWrite-4");
102 
103         Transaction tx1 = tm.begin();
104 
105         Put row1 = new Put(rowName1);
106         row1.addColumn(famName1, colName1, dataValue1);
107         tt.put(tx1, row1);
108         Put row2 = new Put(rowName2);
109         row2.addColumn(famName1, colName1, dataValue2);
110         tt.put(tx1, row2);
111 
112         tm.commit(tx1);
113 
114         Transaction tx2 = tm.begin();
115 
116         row1 = new Put(rowName1);
117         row1.addColumn(famName1, colName1, dataValue3);
118         tt.put(tx2, row1);
119         row2 = new Put(rowName2);
120         row2.addColumn(famName1, colName1, dataValue4);
121         tt.put(tx2, row2);
122 
123         tm.commit(tx2);
124 
125         tt.close();
126 
127         // Checks
128         Get getResultRow1 = new Get(rowName1).setMaxVersions(2);
129         Result result1 = tt.getHTable().get(getResultRow1);
130         byte[] val1 = result1.getValue(famName1, colName1);
131         assertTrue(Bytes.equals(dataValue3, result1.getValue(famName1, colName1)),
132                 "Unexpected value for row 1 in col 1: " + Bytes.toString(val1));
133 
134         long lastTsRow1 = result1.rawCells()[0].getTimestamp();
135         long previousTsRow1 = result1.rawCells()[1].getTimestamp();
136 
137         Get getResultRow2 = new Get(rowName2).setMaxVersions(2);
138         Result result2 = tt.getHTable().get(getResultRow2);
139         byte[] val2 = result2.getValue(famName1, colName1);
140         assertTrue(Bytes.equals(dataValue4, result2.getValue(famName1, colName1)),
141                 "Unexpected value for row 2 in col 1: " + Bytes.toString(val2));
142 
143         long lastTsRow2 = result2.rawCells()[0].getTimestamp();
144         long previousTsRow2 = result2.rawCells()[1].getTimestamp();
145 
146         assertTrue(lastTsRow1 == lastTsRow2, "Timestamps assigned by Tx2 to row 1 and row 2 are different");
147         assertTrue(previousTsRow1 == previousTsRow2, "Timestamps assigned by Tx2 to row 1 and row 2 are different");
148         assertTrue(lastTsRow1 > previousTsRow1, "Timestamp assigned by Tx2 to row 1 hasn't increased monotonically");
149         assertTrue(lastTsRow2 > previousTsRow2, "Timestamp assigned by Tx2 to row 2 hasn't increased monotonically");
150 
151     }
152 
153     @Test(timeOut = 30_000)
154     public void runTestSimple(ITestContext context) throws Exception {
155 
156         TransactionManager tm = newTransactionManager(context);
157 
158         TTable tt = new TTable(connection, TEST_TABLE);
159 
160         Transaction t1 = tm.begin();
161         LOG.info("Transaction created " + t1);
162 
163         byte[] row = Bytes.toBytes("test-simple");
164         byte[] fam = Bytes.toBytes(TEST_FAMILY);
165         byte[] col = Bytes.toBytes("testdata");
166         byte[] data1 = Bytes.toBytes("testWrite-1");
167         byte[] data2 = Bytes.toBytes("testWrite-2");
168 
169         Put p = new Put(row);
170         p.addColumn(fam, col, data1);
171         tt.put(t1, p);
172         tm.commit(t1);
173 
174         Transaction tread = tm.begin();
175         Transaction t2 = tm.begin();
176         p = new Put(row);
177         p.addColumn(fam, col, data2);
178         tt.put(t2, p);
179         tm.commit(t2);
180 
181         Get g = new Get(row).setMaxVersions(1);
182         Result r = tt.getHTable().get(g);
183         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
184                 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
185 
186         r = tt.get(tread, g);
187         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
188                 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
189     }
190 
191     @Test(timeOut = 30_000)
192     public void runTestManyVersions(ITestContext context) throws Exception {
193 
194         TransactionManager tm = newTransactionManager(context);
195         TTable tt = new TTable(connection, TEST_TABLE);
196 
197         Transaction t1 = tm.begin();
198         LOG.info("Transaction created " + t1);
199 
200         byte[] row = Bytes.toBytes("test-simple");
201         byte[] fam = Bytes.toBytes(TEST_FAMILY);
202         byte[] col = Bytes.toBytes("testdata");
203         byte[] data1 = Bytes.toBytes("testWrite-1");
204         byte[] data2 = Bytes.toBytes("testWrite-2");
205 
206         Put p = new Put(row);
207         p.addColumn(fam, col, data1);
208         tt.put(t1, p);
209         tm.commit(t1);
210 
211         for (int i = 0; i < 5; ++i) {
212             Transaction t2 = tm.begin();
213             p = new Put(row);
214             p.addColumn(fam, col, data2);
215             tt.put(t2, p);
216         }
217         Transaction tread = tm.begin();
218 
219         Get g = new Get(row).setMaxVersions(1);
220         Result r = tt.getHTable().get(g);
221         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
222                 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
223 
224         r = tt.get(tread, g);
225         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
226                 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
227 
228     }
229 
230     @Test(timeOut = 30_000)
231     public void runTestInterleave(ITestContext context) throws Exception {
232 
233         TransactionManager tm = newTransactionManager(context);
234         TTable tt = new TTable(connection, TEST_TABLE);
235 
236         Transaction t1 = tm.begin();
237         LOG.info("Transaction created " + t1);
238 
239         byte[] row = Bytes.toBytes("test-interleave");
240         byte[] fam = Bytes.toBytes(TEST_FAMILY);
241         byte[] col = Bytes.toBytes("testdata");
242         byte[] data1 = Bytes.toBytes("testWrite-1");
243         byte[] data2 = Bytes.toBytes("testWrite-2");
244 
245         Put p = new Put(row);
246         p.addColumn(fam, col, data1);
247         tt.put(t1, p);
248         tm.commit(t1);
249 
250         Transaction t2 = tm.begin();
251         p = new Put(row);
252         p.addColumn(fam, col, data2);
253         tt.put(t2, p);
254 
255         Transaction tread = tm.begin();
256         Get g = new Get(row).setMaxVersions(1);
257         Result r = tt.get(tread, g);
258         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
259                 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
260         try {
261             tm.commit(t2);
262         } catch (RollbackException e) {
263             if (!getClient(context).isLowLatency())
264                 fail();
265             return;
266         }
267         r = tt.getHTable().get(g);
268         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
269                 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
270 
271     }
272 
273     @Test(expectedExceptions = IllegalArgumentException.class, timeOut = 30_000)
274     public void testSameCommitRaisesException(ITestContext context) throws Exception {
275         TransactionManager tm = newTransactionManager(context);
276 
277         Transaction t1 = tm.begin();
278         tm.commit(t1);
279         tm.commit(t1);
280     }
281 
282     @Test(timeOut = 30_000)
283     public void testInterleavedScanReturnsTheRightSnapshotResults(ITestContext context) throws Exception {
284 
285         TransactionManager tm = newTransactionManager(context);
286         TTable txTable = new TTable(connection, TEST_TABLE);
287 
288         // Basic data-scaffolding for test
289         byte[] fam = Bytes.toBytes(TEST_FAMILY);
290         byte[] col = Bytes.toBytes("TEST_COL");
291         byte[] data1 = Bytes.toBytes("testWrite-1");
292         byte[] data2 = Bytes.toBytes("testWrite-2");
293 
294         byte[] startRow = Bytes.toBytes("row-to-scan" + 0);
295         byte[] stopRow = Bytes.toBytes("row-to-scan" + 9);
296         byte[] randomRow = Bytes.toBytes("row-to-scan" + 3);
297 
298         // Add some data transactionally to have an initial state for the test
299         Transaction tx1 = tm.begin();
300         for (int i = 0; i < 10; i++) {
301             byte[] row = Bytes.toBytes("row-to-scan" + i);
302 
303             Put p = new Put(row);
304             p.addColumn(fam, col, data1);
305             txTable.put(tx1, p);
306         }
307         tm.commit(tx1);
308 
309         // Start a second transaction -Tx2- modifying a random row and check that a concurrent transactional context
310         // that scans the table, gets the proper snapshot with the stuff written by Tx1
311         Transaction tx2 = tm.begin();
312         Put p = new Put(randomRow);
313         p.addColumn(fam, col, data2);
314         txTable.put(tx2, p);
315 
316         Transaction scanTx = tm.begin(); // This is the concurrent transactional scanner
317         ResultScanner rs = txTable.getScanner(scanTx, new Scan().setStartRow(startRow).setStopRow(stopRow));
318         Result r = rs.next(); // Exercise the next() method
319         int i = 0;
320         while (r != null) {
321             LOG.trace("Scan (" + ++i + ")" + Bytes.toString(r.getRow()) + " => " + Bytes.toString(r.getValue(fam, col)));
322             assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
323                     "Unexpected value for SI scan " + scanTx + ": " + Bytes.toString(r.getValue(fam, col)));
324             r = rs.next();
325         }
326 
327         // Commit the Tx2 and then check that under a new transactional context, the scanner gets the right snapshot,
328         // which must include the row modified by Tx2
329         if (getClient(context).isLowLatency()) {
330             //No point going on from here, tx2 is going to be invalidated and modified wil be 0
331             return;
332         }
333 
334         tm.commit(tx2);
335 
336         int modifiedRows = 0;
337         Transaction newScanTx = tm.begin();
338         ResultScanner newRS = txTable.getScanner(newScanTx, new Scan().setStartRow(startRow).setStopRow(stopRow));
339         Result[] results = newRS.next(10); // Exercise the next(numRows) method
340         for (Result result : results) {
341             if (Bytes.equals(data2, result.getValue(fam, col))) {
342                 LOG.trace("Modified :" + Bytes.toString(result.getRow()));
343                 modifiedRows++;
344             }
345         }
346         assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are.");
347 
348         // Same check as before but checking that the results are correct when retrieved through the Scanner Iterator
349         modifiedRows = 0;
350         ResultScanner iterableRS = txTable.getScanner(newScanTx, new Scan().setStartRow(startRow).setStopRow(stopRow));
351         for (Result res : iterableRS) {
352             if (Bytes.equals(data2, res.getValue(fam, col))) {
353                 LOG.trace("Modified :" + Bytes.toString(res.getRow()));
354                 modifiedRows++;
355             }
356         }
357 
358         assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are.");
359 
360         // Finally, check that the Scanner Iterator does not implement the remove method
361         try {
362             iterableRS.iterator().remove();
363             fail();
364         } catch (RuntimeException re) {
365             // Expected
366         }
367 
368     }
369 
370     @Test(timeOut = 30_000)
371     public void testInterleavedScanReturnsTheRightSnapshotResultsWhenATransactionAborts(ITestContext context)
372             throws Exception {
373 
374         TransactionManager tm = newTransactionManager(context);
375         TTable txTable = new TTable(connection, TEST_TABLE);
376 
377         // Basic data-scaffolding for test
378         byte[] fam = Bytes.toBytes(TEST_FAMILY);
379         byte[] col = Bytes.toBytes("TEST_COL");
380         byte[] data1 = Bytes.toBytes("testWrite-1");
381         byte[] data2 = Bytes.toBytes("testWrite-2");
382 
383         byte[] startRow = Bytes.toBytes("row-to-scan" + 0);
384         byte[] stopRow = Bytes.toBytes("row-to-scan" + 9);
385         byte[] randomRow = Bytes.toBytes("row-to-scan" + 3);
386 
387         // Add some data transactionally to have an initial state for the test
388         Transaction tx1 = tm.begin();
389         for (int i = 0; i < 10; i++) {
390             byte[] row = Bytes.toBytes("row-to-scan" + i);
391 
392             Put p = new Put(row);
393             p.addColumn(fam, col, data1);
394             txTable.put(tx1, p);
395         }
396         tm.commit(tx1);
397 
398         // Start a second transaction modifying a random row and check that a transactional scanner in Tx2 gets the
399         // right snapshot with the new value in the random row just written by Tx2
400         Transaction tx2 = tm.begin();
401         Put p = new Put(randomRow);
402         p.addColumn(fam, col, data2);
403         txTable.put(tx2, p);
404 
405         int modifiedRows = 0;
406         ResultScanner rs = txTable.getScanner(tx2, new Scan().setStartRow(startRow).setStopRow(stopRow));
407         Result r = rs.next();
408         while (r != null) {
409             if (Bytes.equals(data2, r.getValue(fam, col))) {
410                 LOG.trace("Modified :" + Bytes.toString(r.getRow()));
411                 modifiedRows++;
412             }
413 
414             r = rs.next();
415         }
416 
417         assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are.");
418 
419         // Rollback the second transaction and then check that under a new transactional scanner we get the snapshot
420         // that includes the only the initial rows put by Tx1
421         tm.rollback(tx2);
422 
423         Transaction txScan = tm.begin();
424         rs = txTable.getScanner(txScan, new Scan().setStartRow(startRow).setStopRow(stopRow));
425         r = rs.next();
426         while (r != null) {
427             LOG.trace("Scan1 :" + Bytes.toString(r.getRow()) + " => " + Bytes.toString(r.getValue(fam, col)));
428             assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
429                     "Unexpected value for SI scan " + txScan + ": " + Bytes.toString(r.getValue(fam, col)));
430             r = rs.next();
431         }
432 
433         // Same check as before but checking that the results are correct when retrieved through the Scanner Iterator
434         ResultScanner iterableRS = txTable.getScanner(txScan, new Scan().setStartRow(startRow).setStopRow(stopRow));
435         for (Result result : iterableRS) {
436             assertTrue(Bytes.equals(data1, result.getValue(fam, col)),
437                     "Unexpected value for SI scan " + txScan + ": " + Bytes.toString(result.getValue(fam, col)));
438         }
439 
440         // Finally, check that the Scanner Iterator does not implement the remove method
441         try {
442             iterableRS.iterator().remove();
443             fail();
444         } catch (RuntimeException re) {
445             // Expected
446         }
447 
448     }
449 
450     @Test(timeOut = 30_000)
451     public void testAutoCommit(ITestContext context)
452             throws Exception {
453 
454         TransactionManager tm = newTransactionManager(context);
455         TTable tt = new TTable(connection, TEST_TABLE);
456 
457         byte[] rowName1 = Bytes.toBytes("row1");
458         byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
459         byte[] colName1 = Bytes.toBytes("col1");
460         byte[] dataValue1 = Bytes.toBytes("testWrite-1");
461 
462         Transaction tx1 = tm.begin();
463 
464         Put row1 = new Put(rowName1);
465         row1.addColumn(famName1, colName1, dataValue1);
466         tt.put(tx1, row1);
467 
468         Transaction tx2 = tm.begin();
469 
470         Transaction tx3 = tm.begin();
471 
472         Get g = new Get(rowName1).setMaxVersions();
473         g.addColumn(famName1, colName1);
474         Result r = tt.get(tx3, g);
475         assertEquals(r.size(), 0, "Unexpected size for read.");
476 
477         row1 = new Put(rowName1);
478         row1.addColumn(famName1, colName1, dataValue1);
479         tt.put(tx2, row1, true);
480 
481         r = tt.get(tx3, g);
482         assertEquals(r.size(), 1, "Unexpected size for read.");
483 
484         tt.close();
485     }
486 
487 }