View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.transaction;
19  
20  import static org.testng.Assert.assertEquals;
21  import static org.testng.Assert.assertTrue;
22  import static org.testng.Assert.fail;
23  
24  import org.apache.hadoop.hbase.HColumnDescriptor;
25  import org.apache.hadoop.hbase.HTableDescriptor;
26  import org.apache.hadoop.hbase.TableName;
27  import org.apache.hadoop.hbase.client.Admin;
28  import org.apache.hadoop.hbase.client.Connection;
29  import org.apache.hadoop.hbase.client.ConnectionFactory;
30  import org.apache.hadoop.hbase.client.Delete;
31  import org.apache.hadoop.hbase.client.Get;
32  import org.apache.hadoop.hbase.client.Put;
33  import org.apache.hadoop.hbase.client.Result;
34  import org.apache.hadoop.hbase.client.ResultScanner;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.client.Table;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  import org.testng.ITestContext;
41  import org.testng.annotations.Test;
42  
43  import java.io.IOException;
44  
45  @Test(groups = "sharedHBase")
46  public class TestTransactionConflict extends OmidTestBase {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(TestTransactionConflict.class);
49  
50      @Test(timeOut = 10_000)
51      public void runTestWriteWriteConflict(ITestContext context) throws Exception {
52          TransactionManager tm = newTransactionManager(context);
53          TTable tt = new TTable(connection, TEST_TABLE);
54  
55          Transaction t1 = tm.begin();
56          LOG.info("Transaction created " + t1);
57  
58          Transaction t2 = tm.begin();
59          LOG.info("Transaction created" + t2);
60  
61          byte[] row = Bytes.toBytes("test-simple");
62          byte[] fam = Bytes.toBytes(TEST_FAMILY);
63          byte[] col = Bytes.toBytes("testdata");
64          byte[] data1 = Bytes.toBytes("testWrite-1");
65          byte[] data2 = Bytes.toBytes("testWrite-2");
66  
67          Put p = new Put(row);
68          p.addColumn(fam, col, data1);
69          tt.put(t1, p);
70  
71          Put p2 = new Put(row);
72          p2.addColumn(fam, col, data2);
73          tt.put(t2, p2);
74  
75          tm.commit(t2);
76  
77          try {
78              tm.commit(t1);
79              fail("Transaction should not commit successfully");
80          } catch (RollbackException e) {
81          }
82      }
83  
84      @Test(timeOut = 10_000)
85      public void runTestMultiTableConflict(ITestContext context) throws Exception {
86          TransactionManager tm = newTransactionManager(context);
87          TTable tt = new TTable(connection, TEST_TABLE);
88          String table2 = TEST_TABLE + 2;
89          TableName table2Name = TableName.valueOf(table2);
90  
91          try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
92               Admin admin = conn.getAdmin()) {
93              TableName htable2 = TableName.valueOf(table2);
94  
95              if (!admin.tableExists(htable2)) {
96                  HTableDescriptor desc = new HTableDescriptor(table2Name);
97                  HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
98                  datafam.setMaxVersions(Integer.MAX_VALUE);
99                  desc.addFamily(datafam);
100     
101                 admin.createTable(desc);
102             }
103     
104             if (admin.isTableDisabled(htable2)) {
105                 admin.enableTable(htable2);
106             }
107         }
108 
109         TTable tt2 = new TTable(connection, table2);
110 
111         Transaction t1 = tm.begin();
112         LOG.info("Transaction created " + t1);
113 
114         Transaction t2 = tm.begin();
115         LOG.info("Transaction created" + t2);
116 
117         byte[] row = Bytes.toBytes("test-simple");
118         byte[] row2 = Bytes.toBytes("test-simple2");
119         byte[] fam = Bytes.toBytes(TEST_FAMILY);
120         byte[] col = Bytes.toBytes("testdata");
121         byte[] data1 = Bytes.toBytes("testWrite-1");
122         byte[] data2 = Bytes.toBytes("testWrite-2");
123 
124         Put p = new Put(row);
125         p.addColumn(fam, col, data1);
126         tt.put(t1, p);
127         tt2.put(t1, p);
128 
129         Put p2 = new Put(row);
130         p2.addColumn(fam, col, data2);
131         tt.put(t2, p2);
132         p2 = new Put(row2);
133         p2.addColumn(fam, col, data2);
134         tt2.put(t2, p2);
135 
136         tm.commit(t2);
137 
138         boolean aborted = false;
139         try {
140             tm.commit(t1);
141             fail("Transaction commited successfully");
142         } catch (RollbackException e) {
143             aborted = true;
144         }
145         assertTrue(aborted, "Transaction didn't raise exception");
146 
147         ResultScanner rs = tt2.getHTable().getScanner(fam, col);
148 
149         int count = 0;
150         Result r;
151         while ((r = rs.next()) != null) {
152             count += r.size();
153         }
154         assertEquals(count, 1, "Should have cell");
155     }
156 
157     @Test(timeOut = 10_000)
158     public void runTestCleanupAfterConflict(ITestContext context) throws Exception {
159         TransactionManager tm = newTransactionManager(context);
160         TTable tt = new TTable(connection, TEST_TABLE);
161 
162         Transaction t1 = tm.begin();
163         LOG.info("Transaction created " + t1);
164 
165         Transaction t2 = tm.begin();
166         LOG.info("Transaction created" + t2);
167 
168         byte[] row = Bytes.toBytes("test-simple");
169         byte[] fam = Bytes.toBytes(TEST_FAMILY);
170         byte[] col = Bytes.toBytes("testdata");
171         byte[] data1 = Bytes.toBytes("testWrite-1");
172         byte[] data2 = Bytes.toBytes("testWrite-2");
173 
174         Put p = new Put(row);
175         p.addColumn(fam, col, data1);
176         tt.put(t1, p);
177 
178         Get g = new Get(row).setMaxVersions();
179         g.addColumn(fam, col);
180         Result r = tt.getHTable().get(g);
181         assertEquals(r.size(), 1, "Unexpected size for read.");
182         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
183                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
184 
185         Put p2 = new Put(row);
186         p2.addColumn(fam, col, data2);
187         tt.put(t2, p2);
188 
189         r = tt.getHTable().get(g);
190         assertEquals(r.size(), 2, "Unexpected size for read.");
191         r = tt.get(t2, g);
192         assertEquals(r.size(),1, "Unexpected size for read.");
193         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
194                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
195 
196         tm.commit(t1);
197 
198         boolean aborted = false;
199         try {
200             tm.commit(t2);
201             fail("Transaction commited successfully");
202         } catch (RollbackException e) {
203             aborted = true;
204         }
205         assertTrue(aborted, "Transaction didn't raise exception");
206 
207         r = tt.getHTable().get(g);
208         assertEquals(r.size(), 1, "Unexpected size for read.");
209         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
210                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
211     }
212 
213     @Test(timeOut = 10_000)
214     public void testCleanupWithDeleteRow(ITestContext context) throws Exception {
215 
216         TransactionManager tm = newTransactionManager(context);
217         TTable tt = new TTable(connection, TEST_TABLE);
218 
219         Transaction t1 = tm.begin();
220         LOG.info("Transaction created " + t1);
221 
222         int rowcount = 10;
223         int count = 0;
224 
225         byte[] fam = Bytes.toBytes(TEST_FAMILY);
226         byte[] col = Bytes.toBytes("testdata");
227         byte[] data1 = Bytes.toBytes("testWrite-1");
228         byte[] data2 = Bytes.toBytes("testWrite-2");
229 
230         byte[] modrow = Bytes.toBytes("test-del" + 3);
231         for (int i = 0; i < rowcount; i++) {
232             byte[] row = Bytes.toBytes("test-del" + i);
233 
234             Put p = new Put(row);
235             p.addColumn(fam, col, data1);
236             tt.put(t1, p);
237         }
238         tm.commit(t1);
239 
240         Transaction t2 = tm.begin();
241         LOG.info("Transaction created " + t2);
242         Delete d = new Delete(modrow);
243         tt.delete(t2, d);
244 
245         ResultScanner rs = tt.getScanner(t2, new Scan());
246         Result r = rs.next();
247         count = 0;
248         while (r != null) {
249             count++;
250             LOG.trace("row: " + Bytes.toString(r.getRow()) + " count: " + count);
251             r = rs.next();
252         }
253         assertEquals(count, rowcount - 1, "Wrong count");
254 
255         Transaction t3 = tm.begin();
256         LOG.info("Transaction created " + t3);
257         Put p = new Put(modrow);
258         p.addColumn(fam, col, data2);
259         tt.put(t3, p);
260 
261         tm.commit(t3);
262 
263         boolean aborted = false;
264         try {
265             tm.commit(t2);
266             fail("Didn't abort");
267         } catch (RollbackException e) {
268             aborted = true;
269         }
270         assertTrue(aborted, "Didn't raise exception");
271 
272         Transaction tscan = tm.begin();
273         rs = tt.getScanner(tscan, new Scan());
274         r = rs.next();
275         count = 0;
276         while (r != null) {
277             count++;
278             r = rs.next();
279         }
280         assertEquals(count, rowcount, "Wrong count");
281 
282     }
283 
284     private int countRows(Table table) throws IOException {
285         Scan scan = new Scan();
286         ResultScanner scanner = table.getScanner(scan);
287         Result r = scanner.next();
288         int rowCount = 0;
289         while (r != null) {
290             r = scanner.next();
291             rowCount++;
292         }
293         return rowCount;
294     }
295 
296     @Test(timeOut = 60_000)
297     public void testBatchedCleanup(ITestContext context) throws Exception {
298 
299         String table2 = "testBatchedCleanupTABLE2";
300         TableName table2Name = TableName.valueOf(table2);
301 
302         try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
303              Admin admin = conn.getAdmin()) {
304             TableName htable2 = TableName.valueOf(table2);
305 
306             if (!admin.tableExists(htable2)) {
307                 HTableDescriptor desc = new HTableDescriptor(table2Name);
308                 HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
309                 datafam.setMaxVersions(Integer.MAX_VALUE);
310                 desc.addFamily(datafam);
311 
312                 admin.createTable(desc);
313             }
314 
315             if (admin.isTableDisabled(htable2)) {
316                 admin.enableTable(htable2);
317             }
318         }
319 
320         TransactionManager tm = newTransactionManager(context);
321         TTable tt = new TTable(connection, TEST_TABLE);
322         TTable tt2 = new TTable(connection, table2);
323 
324         Transaction t1 = tm.begin();
325         LOG.info("Transaction created " + t1);
326 
327         Transaction t2 = tm.begin();
328         LOG.info("Transaction created" + t2);
329 
330         byte[] row = Bytes.toBytes("test-simple");
331         byte[] fam = Bytes.toBytes(TEST_FAMILY);
332         byte[] col = Bytes.toBytes("testdata");
333         byte[] data1 = Bytes.toBytes("testWrite-1");
334         byte[] data2 = Bytes.toBytes("testWrite-2");
335 
336         Put p = new Put(row);
337         p.addColumn(fam, col, data1);
338         tt.put(t1, p);
339 
340         Get g = new Get(row).setMaxVersions();
341         g.addColumn(fam, col);
342         Result r = tt.getHTable().get(g);
343         assertEquals(r.size(), 1, "Unexpected size for read.");
344         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
345                 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
346 
347         int rowcount = HBaseTransaction.MAX_DELETE_BATCH_SIZE*2 + 2;
348 
349         // Add this row to cause conflict
350         Put p2 = new Put(row);
351         p2.addColumn(fam, col, data2);
352         tt.put(t2, p2);
353 
354         //Add more rows to hit batch
355         for (int i = 0; i < rowcount; i++) {
356             byte[] newRow = Bytes.toBytes("test-del" + i);
357             Put put = new Put(newRow);
358             put.addColumn(fam, col, data2);
359             tt.put(t2, put);
360             tt2.put(t2, put);
361         }
362 
363         // validate rows are really written
364         assertEquals(countRows(tt.getHTable()), rowcount + 1, "Unexpected size for read.");
365         assertEquals(countRows(tt2.getHTable()), rowcount, "Unexpected size for read.");
366 
367         tm.commit(t1);
368 
369         boolean aborted = false;
370         try {
371             tm.commit(t2);
372             fail("Transaction commited successfully");
373         } catch (RollbackException e) {
374             aborted = true;
375         }
376         assertTrue(aborted, "Transaction didn't raise exception");
377 
378         // validate rows are cleaned
379         assertEquals(countRows(tt.getHTable()), 1, "Unexpected size for read.");
380         assertEquals(countRows(tt2.getHTable()), 0, "Unexpected size for read.");
381     }
382 
383 
384     @Test(timeOut = 10_000)
385     public void testMultipleCellChangesOnSameRow(ITestContext context) throws Exception {
386         TransactionManager tm = newTransactionManager(context);
387         TTable tt = new TTable(connection, TEST_TABLE);
388 
389         Transaction t1 = tm.begin();
390         Transaction t2 = tm.begin();
391         LOG.info("Transactions created " + t1 + " " + t2);
392 
393         byte[] row = Bytes.toBytes("row");
394         byte[] fam = Bytes.toBytes(TEST_FAMILY);
395         byte[] col1 = Bytes.toBytes("testdata1");
396         byte[] col2 = Bytes.toBytes("testdata2");
397         byte[] data = Bytes.toBytes("testWrite-1");
398 
399         Put p2 = new Put(row);
400         p2.addColumn(fam, col1, data);
401         tt.put(t2, p2);
402         tm.commit(t2);
403 
404         Put p1 = new Put(row);
405         p1.addColumn(fam, col2, data);
406         tt.put(t1, p1);
407         tm.commit(t1);
408     }
409 
410     @Test(timeOut = 10_000)
411     public void runTestWriteWriteConflictWithAdditionalConflictFreeWrites(ITestContext context) throws Exception {
412         TransactionManager tm = newTransactionManager(context);
413         TTable tt1 = new TTable(connection, TEST_TABLE);
414         TTable tt2 = new TTable(connection, TEST_TABLE, true);
415 
416         Transaction t1 = tm.begin();
417         LOG.info("Transaction created " + t1);
418 
419         Transaction t2 = tm.begin();
420         LOG.info("Transaction created" + t2);
421 
422         byte[] row = Bytes.toBytes("test-simple");
423         byte[] fam = Bytes.toBytes(TEST_FAMILY);
424         byte[] col = Bytes.toBytes("testdata");
425         byte[] data1 = Bytes.toBytes("testWrite-1");
426         byte[] data2 = Bytes.toBytes("testWrite-2");
427 
428         Put p = new Put(row);
429         p.addColumn(fam, col, data1);
430         tt1.put(t1, p);
431 
432         Put p2 = new Put(row);
433         p2.addColumn(fam, col, data2);
434         tt1.put(t2, p2);
435 
436         row = Bytes.toBytes("test-simple-cf");
437         p = new Put(row);
438         p.addColumn(fam, col, data1);
439         tt2.put(t1, p);
440 
441         p2 = new Put(row);
442         p2.addColumn(fam, col, data2);
443         tt2.put(t2, p2);
444 
445         tm.commit(t2);
446 
447         try {
448             tm.commit(t1);
449             fail("Transaction should not commit successfully");
450         } catch (RollbackException e) {
451         }
452     }
453 
454     @Test(timeOut = 10_000)
455     public void runTestWriteWriteConflictFreeWrites(ITestContext context) throws Exception {
456         TransactionManager tm = newTransactionManager(context);
457         TTable tt1 = new TTable(connection, TEST_TABLE);
458         TTable tt2 = new TTable(connection, TEST_TABLE, true);
459 
460         Transaction t1 = tm.begin();
461         LOG.info("Transaction created " + t1);
462 
463         Transaction t2 = tm.begin();
464         LOG.info("Transaction created" + t2);
465 
466         byte[] row = Bytes.toBytes("test-simple");
467         byte[] fam = Bytes.toBytes(TEST_FAMILY);
468         byte[] col = Bytes.toBytes("testdata");
469         byte[] data1 = Bytes.toBytes("testWrite-1");
470         byte[] data2 = Bytes.toBytes("testWrite-2");
471 
472         Put p = new Put(row);
473         p.addColumn(fam, col, data1);
474         tt1.put(t1, p);
475 
476         Put p2 = new Put(row);
477         p2.addColumn(fam, col, data2);
478         tt2.put(t2, p2);
479 
480         row = Bytes.toBytes("test-simple-cf");
481         p = new Put(row);
482         p.addColumn(fam, col, data1);
483         tt1.put(t1, p);
484 
485         p2 = new Put(row);
486         p2.addColumn(fam, col, data2);
487         tt2.put(t2, p2);
488 
489         tm.commit(t2);
490 
491         try {
492             tm.commit(t1);
493         } catch (RollbackException e) {
494             fail("Transaction should not commit successfully");
495         }
496     }
497 
498     @Test(timeOut = 10_000)
499     public void runTestWriteWriteConflictFreeWritesWithOtherWrites(ITestContext context) throws Exception {
500         TransactionManager tm = newTransactionManager(context);
501         TTable tt1 = new TTable(connection, TEST_TABLE);
502         TTable tt2 = new TTable(connection, TEST_TABLE, true);
503 
504         Transaction t1 = tm.begin();
505         LOG.info("Transaction created " + t1);
506 
507         Transaction t2 = tm.begin();
508         LOG.info("Transaction created" + t2);
509 
510         byte[] row = Bytes.toBytes("test-simple");
511         byte[] row1 = Bytes.toBytes("test-simple-1");
512         byte[] fam = Bytes.toBytes(TEST_FAMILY);
513         byte[] col = Bytes.toBytes("testdata");
514         byte[] data1 = Bytes.toBytes("testWrite-1");
515         byte[] data2 = Bytes.toBytes("testWrite-2");
516 
517         Put p = new Put(row);
518         p.addColumn(fam, col, data1);
519         tt1.put(t1, p);
520 
521         Put p2 = new Put(row1);
522         p2.addColumn(fam, col, data2);
523         tt1.put(t2, p2);
524 
525         row = Bytes.toBytes("test-simple-cf");
526         p = new Put(row);
527         p.addColumn(fam, col, data1);
528         tt2.put(t1, p);
529 
530         p2 = new Put(row);
531         p2.addColumn(fam, col, data2);
532         tt2.put(t2, p2);
533 
534         tm.commit(t2);
535 
536         try {
537             tm.commit(t1);
538         } catch (RollbackException e) {
539             fail("Transaction should not commit successfully");
540         }
541     }
542 
543     @Test(timeOut = 10_000)
544     public void runTestCleanupConflictFreeWritesAfterConflict(ITestContext context) throws Exception {
545         TransactionManager tm = newTransactionManager(context);
546         TTable tt1 = new TTable(connection, TEST_TABLE);
547         TTable tt2 = new TTable(connection, TEST_TABLE, true);
548 
549         Transaction t1 = tm.begin();
550         LOG.info("Transaction created " + t1);
551 
552         Transaction t2 = tm.begin();
553         LOG.info("Transaction created" + t2);
554 
555         byte[] row = Bytes.toBytes("test-simple");
556         byte[] row1 = Bytes.toBytes("test-simple-1");
557         byte[] fam = Bytes.toBytes(TEST_FAMILY);
558         byte[] col = Bytes.toBytes("testdata");
559         byte[] data1 = Bytes.toBytes("testWrite-1");
560         byte[] data2 = Bytes.toBytes("testWrite-2");
561 
562         Put p = new Put(row);
563         p.addColumn(fam, col, data1);
564         tt1.put(t1, p);
565 
566         Get g = new Get(row).setMaxVersions();
567         g.addColumn(fam, col);
568         Result r = tt1.getHTable().get(g);
569         assertEquals(r.size(), 1, "Unexpected size for read.");
570         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
571                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
572 
573         Put p2 = new Put(row);
574         p2.addColumn(fam, col, data2);
575         tt1.put(t2, p2);
576 
577         Put p3 = new Put(row1);
578         p3.addColumn(fam, col, data2);
579         tt2.put(t2, p3);
580 
581         r = tt1.getHTable().get(g);
582         assertEquals(r.size(), 2, "Unexpected size for read.");
583         r = tt2.get(t2, g);
584         assertEquals(r.size(),1, "Unexpected size for read.");
585         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
586                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
587 
588         Get g1 = new Get(row1).setMaxVersions();
589         g1.addColumn(fam, col);
590         r = tt1.getHTable().get(g1);
591         assertEquals(r.size(), 1, "Unexpected size for read.");
592 
593         tm.commit(t1);
594 
595         boolean aborted = false;
596         try {
597             tm.commit(t2);
598             fail("Transaction commited successfully");
599         } catch (RollbackException e) {
600             aborted = true;
601         }
602         assertTrue(aborted, "Transaction didn't raise exception");
603 
604         r = tt1.getHTable().get(g);
605         assertEquals(r.size(), 1, "Unexpected size for read.");
606         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
607                    "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
608         r = tt1.getHTable().get(g1);
609         assertEquals(r.size(), 0, "Unexpected size for read.");
610     }
611 }