1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.examples;
19
20 import java.io.IOException;
21 import java.util.Arrays;
22
23 import org.apache.commons.lang.StringUtils;
24 import org.apache.hadoop.hbase.client.ConnectionFactory;
25 import org.apache.hadoop.hbase.client.Get;
26 import org.apache.hadoop.hbase.client.Put;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.util.Bytes;
29 import org.apache.omid.transaction.HBaseTransactionManager;
30 import org.apache.omid.transaction.RollbackException;
31 import org.apache.omid.transaction.TTable;
32 import org.apache.omid.transaction.Transaction;
33 import org.apache.omid.transaction.TransactionManager;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public class SnapshotIsolationExample {
79
80 private static final Logger LOG = LoggerFactory.getLogger(SnapshotIsolationExample.class);
81 private final byte[] qualifier;
82 private final byte[] initialData;
83 private final byte[] dataValue1;
84 private final byte[] dataValue2;
85 private RowIdGenerator rowIdGenerator = new StaticRowIdGenerator();
86 private String userTableName;
87 private byte[] family;
88 private TransactionManager tm;
89 private TTable txTable;
90
91 public static void main(String[] args) throws Exception {
92 SnapshotIsolationExample example = new SnapshotIsolationExample(args);
93 example.execute();
94 example.close();
95 }
96
97 SnapshotIsolationExample(String[] args) throws IOException, InterruptedException {
98 LOG.info("Parsing the command line arguments");
99 userTableName = "MY_TX_TABLE";
100 if (args != null && args.length > 0 && StringUtils.isNotEmpty(args[0])) {
101 userTableName = args[0];
102 }
103 family = Bytes.toBytes("MY_CF");
104 if (args != null && args.length > 1 && StringUtils.isNotEmpty(args[1])) {
105 family = Bytes.toBytes(args[1]);
106 }
107 LOG.info("Table '{}', column family '{}'", userTableName, Bytes.toString(family));
108
109 qualifier = Bytes.toBytes("MY_Q");
110 initialData = Bytes.toBytes("initialVal");
111 dataValue1 = Bytes.toBytes("val1");
112 dataValue2 = Bytes.toBytes("val2");
113
114 LOG.info("--------");
115 LOG.info("NOTE: All Transactions in the Example access column {}:{}/{}/{} [TABLE:ROW/CF/Q]",
116 userTableName, Bytes.toString(rowIdGenerator.getRowId()), Bytes.toString(family),
117 Bytes.toString(qualifier));
118 LOG.info("--------");
119
120 LOG.info("Creating access to Omid Transaction Manager & Transactional Table '{}'", userTableName);
121 tm = HBaseTransactionManager.newInstance();
122 txTable = new TTable(ConnectionFactory.createConnection(), userTableName);
123 }
124
125 void execute() throws IOException, RollbackException {
126
127
128 Transaction tx0 = tm.begin();
129 byte[] rowId = rowIdGenerator.getRowId();
130 Put initialPut = new Put(rowId);
131 initialPut.addColumn(family, qualifier, initialData);
132 txTable.put(tx0, initialPut);
133 tm.commit(tx0);
134 LOG.info("Initial Transaction {} COMMITTED. Base value written in {}:{}/{}/{} = {}",
135 tx0, userTableName, Bytes.toString(rowId), Bytes.toString(family),
136 Bytes.toString(qualifier), Bytes.toString(initialData));
137
138
139 Transaction tx1 = tm.begin();
140 LOG.info("Transaction {} STARTED", tx1);
141 Put tx1Put = new Put(rowId);
142 tx1Put.addColumn(family, qualifier, dataValue1);
143 txTable.put(tx1, tx1Put);
144 LOG.info("Transaction {} updates base value in {}:{}/{}/{} = {} in its own Snapshot",
145 tx1, userTableName, Bytes.toString(rowId), Bytes.toString(family),
146 Bytes.toString(qualifier), Bytes.toString(dataValue1));
147
148
149 Transaction tx2 = tm.begin();
150 LOG.info("Concurrent Transaction {} STARTED", tx2);
151 Get tx2Get = new Get(rowId);
152 tx2Get.addColumn(family, qualifier);
153 Result tx2GetResult = txTable.get(tx2, tx2Get);
154 Preconditions.checkState(Arrays.equals(tx2GetResult.value(), initialData),
155 "As Tx1 is not yet committed, Tx2 should read the value set by Tx0 not the value written by Tx1");
156 LOG.info(
157 "Concurrent Transaction {} should read base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
158 tx2, userTableName, Bytes.toString(rowId), Bytes.toString(family),
159 Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
160
161
162 tm.commit(tx1);
163 LOG.info("Transaction {} COMMITTED. New column value {}:{}/{}/{} = {}",
164 tx1, userTableName, Bytes.toString(rowId), Bytes.toString(family),
165 Bytes.toString(qualifier), Bytes.toString(dataValue1));
166
167
168 tx2Get = new Get(rowId);
169 tx2Get.addColumn(family, qualifier);
170 tx2GetResult = txTable.get(tx2, tx2Get);
171
172 LOG.info(
173 "Concurrent Transaction {} should read again base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
174 tx2, userTableName, Bytes.toString(rowId), Bytes.toString(family),
175 Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
176 Preconditions.checkState(Arrays.equals(tx2GetResult.value(), initialData),
177 "Tx2 must read the initial value written by Tx0");
178
179
180 Put tx2Put = new Put(rowId);
181 tx2Put.addColumn(family, qualifier, dataValue2);
182 txTable.put(tx2, tx2Put);
183 LOG.info(
184 "Concurrent Transaction {} updates {}:{}/{}/{} = {} in its own Snapshot (Will conflict with {} at commit time)",
185 tx2, userTableName, Bytes.toString(rowId), Bytes.toString(family),
186 Bytes.toString(qualifier), Bytes.toString(dataValue1), tx1);
187
188
189 try {
190 LOG.info("Concurrent Transaction {} TRYING TO COMMIT", tx2);
191 tm.commit(tx2);
192
193 Preconditions.checkState(false, "Should have thrown RollbackException");
194 } catch (RollbackException e) {
195 LOG.info("Concurrent Transaction {} ROLLED-BACK : {}", tx2, e.getMessage());
196 }
197 }
198
199 private void close() throws IOException {
200 tm.close();
201 txTable.close();
202 }
203
204
205 void setRowIdGenerator(RowIdGenerator rowIdGenerator) {
206 this.rowIdGenerator = rowIdGenerator;
207 }
208
209 private class StaticRowIdGenerator implements RowIdGenerator {
210
211 @Override
212 public byte[] getRowId() {
213 return Bytes.toBytes("EXAMPLE_ROW");
214 }
215 }
216 }
217