1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.transaction;
19
20 import static org.junit.Assert.fail;
21 import static org.testng.Assert.assertEquals;
22 import static org.testng.Assert.assertTrue;
23
24 import org.apache.hadoop.hbase.client.Get;
25 import org.apache.hadoop.hbase.client.Put;
26 import org.apache.hadoop.hbase.client.Result;
27 import org.apache.hadoop.hbase.client.ResultScanner;
28 import org.apache.hadoop.hbase.client.Scan;
29 import org.apache.hadoop.hbase.util.Bytes;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.testng.ITestContext;
33 import org.testng.annotations.Test;
34
35 @Test(groups = "sharedHBase")
36 public class TestBasicTransaction extends OmidTestBase {
37
38 private static final Logger LOG = LoggerFactory.getLogger(TestBasicTransaction.class);
39
40
41 @Test(timeOut = 30_000)
42 public void testTimestampsOfTwoRowsInstertedAfterCommitOfSingleTransactionAreEquals(ITestContext context) throws Exception {
43
44 TransactionManager tm = newTransactionManager(context);
45 TTable tt = new TTable(connection, TEST_TABLE);
46
47 byte[] rowName1 = Bytes.toBytes("row1");
48 byte[] rowName2 = Bytes.toBytes("row2");
49 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
50 byte[] colName1 = Bytes.toBytes("col1");
51 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
52 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
53
54 Transaction tx1 = tm.begin();
55
56 Put row1 = new Put(rowName1);
57 row1.addColumn(famName1, colName1, dataValue1);
58 tt.put(tx1, row1);
59 Put row2 = new Put(rowName2);
60 row2.addColumn(famName1, colName1, dataValue2);
61 tt.put(tx1, row2);
62
63 tm.commit(tx1);
64
65 tt.close();
66
67
68 Get getResultRow1 = new Get(rowName1).setMaxVersions(1);
69 Result result1 = tt.getHTable().get(getResultRow1);
70 byte[] val1 = result1.getValue(famName1, colName1);
71 assertTrue(Bytes.equals(dataValue1, result1.getValue(famName1, colName1)),
72 "Unexpected value for row 1 in col 1: " + Bytes.toString(val1));
73 long tsRow1 = result1.rawCells()[0].getTimestamp();
74
75 Get getResultRow2 = new Get(rowName2).setMaxVersions(1);
76 Result result2 = tt.getHTable().get(getResultRow2);
77 byte[] val2 = result2.getValue(famName1, colName1);
78 assertTrue(Bytes.equals(dataValue2, result2.getValue(famName1, colName1)),
79 "Unexpected value for row 2 in col 1: " + Bytes.toString(val2));
80 long tsRow2 = result2.rawCells()[0].getTimestamp();
81
82 assertEquals(tsRow2, tsRow1, "Timestamps of row 1 and row 2 are different");
83
84 }
85
86 @Test(timeOut = 30_000)
87 public void testTimestampsOfTwoRowsModifiedByTwoSequentialTransactionsAreEqualAndHaveBeenIncreasedMonotonically(ITestContext context)
88 throws Exception {
89
90 TransactionManager tm = newTransactionManager(context);
91 TTable tt = new TTable(connection, TEST_TABLE);
92
93 byte[] rowName1 = Bytes.toBytes("row1");
94 byte[] rowName2 = Bytes.toBytes("row2");
95 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
96 byte[] colName1 = Bytes.toBytes("col1");
97 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
98 byte[] dataValue2 = Bytes.toBytes("testWrite-2");
99
100 byte[] dataValue3 = Bytes.toBytes("testWrite-3");
101 byte[] dataValue4 = Bytes.toBytes("testWrite-4");
102
103 Transaction tx1 = tm.begin();
104
105 Put row1 = new Put(rowName1);
106 row1.addColumn(famName1, colName1, dataValue1);
107 tt.put(tx1, row1);
108 Put row2 = new Put(rowName2);
109 row2.addColumn(famName1, colName1, dataValue2);
110 tt.put(tx1, row2);
111
112 tm.commit(tx1);
113
114 Transaction tx2 = tm.begin();
115
116 row1 = new Put(rowName1);
117 row1.addColumn(famName1, colName1, dataValue3);
118 tt.put(tx2, row1);
119 row2 = new Put(rowName2);
120 row2.addColumn(famName1, colName1, dataValue4);
121 tt.put(tx2, row2);
122
123 tm.commit(tx2);
124
125 tt.close();
126
127
128 Get getResultRow1 = new Get(rowName1).setMaxVersions(2);
129 Result result1 = tt.getHTable().get(getResultRow1);
130 byte[] val1 = result1.getValue(famName1, colName1);
131 assertTrue(Bytes.equals(dataValue3, result1.getValue(famName1, colName1)),
132 "Unexpected value for row 1 in col 1: " + Bytes.toString(val1));
133
134 long lastTsRow1 = result1.rawCells()[0].getTimestamp();
135 long previousTsRow1 = result1.rawCells()[1].getTimestamp();
136
137 Get getResultRow2 = new Get(rowName2).setMaxVersions(2);
138 Result result2 = tt.getHTable().get(getResultRow2);
139 byte[] val2 = result2.getValue(famName1, colName1);
140 assertTrue(Bytes.equals(dataValue4, result2.getValue(famName1, colName1)),
141 "Unexpected value for row 2 in col 1: " + Bytes.toString(val2));
142
143 long lastTsRow2 = result2.rawCells()[0].getTimestamp();
144 long previousTsRow2 = result2.rawCells()[1].getTimestamp();
145
146 assertTrue(lastTsRow1 == lastTsRow2, "Timestamps assigned by Tx2 to row 1 and row 2 are different");
147 assertTrue(previousTsRow1 == previousTsRow2, "Timestamps assigned by Tx2 to row 1 and row 2 are different");
148 assertTrue(lastTsRow1 > previousTsRow1, "Timestamp assigned by Tx2 to row 1 hasn't increased monotonically");
149 assertTrue(lastTsRow2 > previousTsRow2, "Timestamp assigned by Tx2 to row 2 hasn't increased monotonically");
150
151 }
152
153 @Test(timeOut = 30_000)
154 public void runTestSimple(ITestContext context) throws Exception {
155
156 TransactionManager tm = newTransactionManager(context);
157
158 TTable tt = new TTable(connection, TEST_TABLE);
159
160 Transaction t1 = tm.begin();
161 LOG.info("Transaction created " + t1);
162
163 byte[] row = Bytes.toBytes("test-simple");
164 byte[] fam = Bytes.toBytes(TEST_FAMILY);
165 byte[] col = Bytes.toBytes("testdata");
166 byte[] data1 = Bytes.toBytes("testWrite-1");
167 byte[] data2 = Bytes.toBytes("testWrite-2");
168
169 Put p = new Put(row);
170 p.addColumn(fam, col, data1);
171 tt.put(t1, p);
172 tm.commit(t1);
173
174 Transaction tread = tm.begin();
175 Transaction t2 = tm.begin();
176 p = new Put(row);
177 p.addColumn(fam, col, data2);
178 tt.put(t2, p);
179 tm.commit(t2);
180
181 Get g = new Get(row).setMaxVersions(1);
182 Result r = tt.getHTable().get(g);
183 assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
184 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
185
186 r = tt.get(tread, g);
187 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
188 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
189 }
190
191 @Test(timeOut = 30_000)
192 public void runTestManyVersions(ITestContext context) throws Exception {
193
194 TransactionManager tm = newTransactionManager(context);
195 TTable tt = new TTable(connection, TEST_TABLE);
196
197 Transaction t1 = tm.begin();
198 LOG.info("Transaction created " + t1);
199
200 byte[] row = Bytes.toBytes("test-simple");
201 byte[] fam = Bytes.toBytes(TEST_FAMILY);
202 byte[] col = Bytes.toBytes("testdata");
203 byte[] data1 = Bytes.toBytes("testWrite-1");
204 byte[] data2 = Bytes.toBytes("testWrite-2");
205
206 Put p = new Put(row);
207 p.addColumn(fam, col, data1);
208 tt.put(t1, p);
209 tm.commit(t1);
210
211 for (int i = 0; i < 5; ++i) {
212 Transaction t2 = tm.begin();
213 p = new Put(row);
214 p.addColumn(fam, col, data2);
215 tt.put(t2, p);
216 }
217 Transaction tread = tm.begin();
218
219 Get g = new Get(row).setMaxVersions(1);
220 Result r = tt.getHTable().get(g);
221 assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
222 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
223
224 r = tt.get(tread, g);
225 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
226 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
227
228 }
229
230 @Test(timeOut = 30_000)
231 public void runTestInterleave(ITestContext context) throws Exception {
232
233 TransactionManager tm = newTransactionManager(context);
234 TTable tt = new TTable(connection, TEST_TABLE);
235
236 Transaction t1 = tm.begin();
237 LOG.info("Transaction created " + t1);
238
239 byte[] row = Bytes.toBytes("test-interleave");
240 byte[] fam = Bytes.toBytes(TEST_FAMILY);
241 byte[] col = Bytes.toBytes("testdata");
242 byte[] data1 = Bytes.toBytes("testWrite-1");
243 byte[] data2 = Bytes.toBytes("testWrite-2");
244
245 Put p = new Put(row);
246 p.addColumn(fam, col, data1);
247 tt.put(t1, p);
248 tm.commit(t1);
249
250 Transaction t2 = tm.begin();
251 p = new Put(row);
252 p.addColumn(fam, col, data2);
253 tt.put(t2, p);
254
255 Transaction tread = tm.begin();
256 Get g = new Get(row).setMaxVersions(1);
257 Result r = tt.get(tread, g);
258 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
259 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
260 try {
261 tm.commit(t2);
262 } catch (RollbackException e) {
263 if (!getClient(context).isLowLatency())
264 fail();
265 return;
266 }
267 r = tt.getHTable().get(g);
268 assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
269 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
270
271 }
272
273 @Test(expectedExceptions = IllegalArgumentException.class, timeOut = 30_000)
274 public void testSameCommitRaisesException(ITestContext context) throws Exception {
275 TransactionManager tm = newTransactionManager(context);
276
277 Transaction t1 = tm.begin();
278 tm.commit(t1);
279 tm.commit(t1);
280 }
281
282 @Test(timeOut = 30_000)
283 public void testInterleavedScanReturnsTheRightSnapshotResults(ITestContext context) throws Exception {
284
285 TransactionManager tm = newTransactionManager(context);
286 TTable txTable = new TTable(connection, TEST_TABLE);
287
288
289 byte[] fam = Bytes.toBytes(TEST_FAMILY);
290 byte[] col = Bytes.toBytes("TEST_COL");
291 byte[] data1 = Bytes.toBytes("testWrite-1");
292 byte[] data2 = Bytes.toBytes("testWrite-2");
293
294 byte[] startRow = Bytes.toBytes("row-to-scan" + 0);
295 byte[] stopRow = Bytes.toBytes("row-to-scan" + 9);
296 byte[] randomRow = Bytes.toBytes("row-to-scan" + 3);
297
298
299 Transaction tx1 = tm.begin();
300 for (int i = 0; i < 10; i++) {
301 byte[] row = Bytes.toBytes("row-to-scan" + i);
302
303 Put p = new Put(row);
304 p.addColumn(fam, col, data1);
305 txTable.put(tx1, p);
306 }
307 tm.commit(tx1);
308
309
310
311 Transaction tx2 = tm.begin();
312 Put p = new Put(randomRow);
313 p.addColumn(fam, col, data2);
314 txTable.put(tx2, p);
315
316 Transaction scanTx = tm.begin();
317 ResultScanner rs = txTable.getScanner(scanTx, new Scan().setStartRow(startRow).setStopRow(stopRow));
318 Result r = rs.next();
319 int i = 0;
320 while (r != null) {
321 LOG.trace("Scan (" + ++i + ")" + Bytes.toString(r.getRow()) + " => " + Bytes.toString(r.getValue(fam, col)));
322 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
323 "Unexpected value for SI scan " + scanTx + ": " + Bytes.toString(r.getValue(fam, col)));
324 r = rs.next();
325 }
326
327
328
329 if (getClient(context).isLowLatency()) {
330
331 return;
332 }
333
334 tm.commit(tx2);
335
336 int modifiedRows = 0;
337 Transaction newScanTx = tm.begin();
338 ResultScanner newRS = txTable.getScanner(newScanTx, new Scan().setStartRow(startRow).setStopRow(stopRow));
339 Result[] results = newRS.next(10);
340 for (Result result : results) {
341 if (Bytes.equals(data2, result.getValue(fam, col))) {
342 LOG.trace("Modified :" + Bytes.toString(result.getRow()));
343 modifiedRows++;
344 }
345 }
346 assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are.");
347
348
349 modifiedRows = 0;
350 ResultScanner iterableRS = txTable.getScanner(newScanTx, new Scan().setStartRow(startRow).setStopRow(stopRow));
351 for (Result res : iterableRS) {
352 if (Bytes.equals(data2, res.getValue(fam, col))) {
353 LOG.trace("Modified :" + Bytes.toString(res.getRow()));
354 modifiedRows++;
355 }
356 }
357
358 assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are.");
359
360
361 try {
362 iterableRS.iterator().remove();
363 fail();
364 } catch (RuntimeException re) {
365
366 }
367
368 }
369
370 @Test(timeOut = 30_000)
371 public void testInterleavedScanReturnsTheRightSnapshotResultsWhenATransactionAborts(ITestContext context)
372 throws Exception {
373
374 TransactionManager tm = newTransactionManager(context);
375 TTable txTable = new TTable(connection, TEST_TABLE);
376
377
378 byte[] fam = Bytes.toBytes(TEST_FAMILY);
379 byte[] col = Bytes.toBytes("TEST_COL");
380 byte[] data1 = Bytes.toBytes("testWrite-1");
381 byte[] data2 = Bytes.toBytes("testWrite-2");
382
383 byte[] startRow = Bytes.toBytes("row-to-scan" + 0);
384 byte[] stopRow = Bytes.toBytes("row-to-scan" + 9);
385 byte[] randomRow = Bytes.toBytes("row-to-scan" + 3);
386
387
388 Transaction tx1 = tm.begin();
389 for (int i = 0; i < 10; i++) {
390 byte[] row = Bytes.toBytes("row-to-scan" + i);
391
392 Put p = new Put(row);
393 p.addColumn(fam, col, data1);
394 txTable.put(tx1, p);
395 }
396 tm.commit(tx1);
397
398
399
400 Transaction tx2 = tm.begin();
401 Put p = new Put(randomRow);
402 p.addColumn(fam, col, data2);
403 txTable.put(tx2, p);
404
405 int modifiedRows = 0;
406 ResultScanner rs = txTable.getScanner(tx2, new Scan().setStartRow(startRow).setStopRow(stopRow));
407 Result r = rs.next();
408 while (r != null) {
409 if (Bytes.equals(data2, r.getValue(fam, col))) {
410 LOG.trace("Modified :" + Bytes.toString(r.getRow()));
411 modifiedRows++;
412 }
413
414 r = rs.next();
415 }
416
417 assertEquals(modifiedRows, 1, "Expected 1 row modified, but " + modifiedRows + " are.");
418
419
420
421 tm.rollback(tx2);
422
423 Transaction txScan = tm.begin();
424 rs = txTable.getScanner(txScan, new Scan().setStartRow(startRow).setStopRow(stopRow));
425 r = rs.next();
426 while (r != null) {
427 LOG.trace("Scan1 :" + Bytes.toString(r.getRow()) + " => " + Bytes.toString(r.getValue(fam, col)));
428 assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
429 "Unexpected value for SI scan " + txScan + ": " + Bytes.toString(r.getValue(fam, col)));
430 r = rs.next();
431 }
432
433
434 ResultScanner iterableRS = txTable.getScanner(txScan, new Scan().setStartRow(startRow).setStopRow(stopRow));
435 for (Result result : iterableRS) {
436 assertTrue(Bytes.equals(data1, result.getValue(fam, col)),
437 "Unexpected value for SI scan " + txScan + ": " + Bytes.toString(result.getValue(fam, col)));
438 }
439
440
441 try {
442 iterableRS.iterator().remove();
443 fail();
444 } catch (RuntimeException re) {
445
446 }
447
448 }
449
450 @Test(timeOut = 30_000)
451 public void testAutoCommit(ITestContext context)
452 throws Exception {
453
454 TransactionManager tm = newTransactionManager(context);
455 TTable tt = new TTable(connection, TEST_TABLE);
456
457 byte[] rowName1 = Bytes.toBytes("row1");
458 byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
459 byte[] colName1 = Bytes.toBytes("col1");
460 byte[] dataValue1 = Bytes.toBytes("testWrite-1");
461
462 Transaction tx1 = tm.begin();
463
464 Put row1 = new Put(rowName1);
465 row1.addColumn(famName1, colName1, dataValue1);
466 tt.put(tx1, row1);
467
468 Transaction tx2 = tm.begin();
469
470 Transaction tx3 = tm.begin();
471
472 Get g = new Get(rowName1).setMaxVersions();
473 g.addColumn(famName1, colName1);
474 Result r = tt.get(tx3, g);
475 assertEquals(r.size(), 0, "Unexpected size for read.");
476
477 row1 = new Put(rowName1);
478 row1.addColumn(famName1, colName1, dataValue1);
479 tt.put(tx2, row1, true);
480
481 r = tt.get(tx3, g);
482 assertEquals(r.size(), 1, "Unexpected size for read.");
483
484 tt.close();
485 }
486
487 }