View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.omid.examples;
19  
20  import java.io.IOException;
21  import java.util.Arrays;
22  
23  import org.apache.commons.lang.StringUtils;
24  import org.apache.hadoop.hbase.client.ConnectionFactory;
25  import org.apache.hadoop.hbase.client.Get;
26  import org.apache.hadoop.hbase.client.Put;
27  import org.apache.hadoop.hbase.client.Result;
28  import org.apache.hadoop.hbase.util.Bytes;
29  import org.apache.omid.transaction.HBaseTransactionManager;
30  import org.apache.omid.transaction.RollbackException;
31  import org.apache.omid.transaction.TTable;
32  import org.apache.omid.transaction.Transaction;
33  import org.apache.omid.transaction.TransactionManager;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
38  
39  /**
40   * ****************************************************************************************************************
41   *
42   * Example code which demonstrates the preservation of Snapshot Isolation when writing shared data concurrently
43   *
44   * ****************************************************************************************************************
45   *
46   * Please @see{BasicExample} first
47   *
48   * In the code below, two concurrent transactions (Tx1 and Tx2), try to update the same column in HBase. This will
49   * result in the rollback of Tx2 -the last one trying to commit- due to conflicts in the writeset with the previously
50   * committed transaction Tx1. Also shows how Tx2 reads the right values from its own snapshot in HBase data.
51   *
52   * After building the package with 'mvn clean package' find the resulting examples-{version}-bin.tar.gz file in the
53   * 'examples/target' folder. Copy it to the target host and expand with 'tar -zxvf examples-{version}-bin.tar.gz'.
54   *
55   * Make sure that 'hbase-site.xml' and 'core-site.xml' are either in classpath (see run.sh) or explicitly referenced via
56   * command line arguments. If a secure HBase deployment is needed, use also command line arguments to specify the
57   * principal (user) and keytab file.
58   *
59   * The example requires a user table to perform transactional read/write operations. A table is already specified in the
60   * default configuration, and can be created with the following command using the 'hbase shell':
61   *
62   * <pre>
63   * create 'MY_TX_TABLE', {NAME =&gt; 'MY_CF', VERSIONS =&gt; '2147483647', TTL =&gt; '2147483647'}
64   * </pre>
65   *
66   * Make sure that the principal/user has RW permissions for the given table using also the 'hbase shell':
67   * <pre>
68   * grant '{principal/user}', 'RW', 'MY_TX_TABLE'
69   * </pre>
70   *
71   * Alternatively, a table with a column family already created can be used by specifying the table name and column
72   * family identifiers using the command line arguments (see details also in 'run.sh') If a table namespace is required,
73   * specify it like this: 'namespace:table_name'
74   *
75   * Finally, run the example using the 'run.sh' script without arguments or specifying the necessary configuration
76   * parameters if required.
77   */
78  public class SnapshotIsolationExample {
79  
80      private static final Logger LOG = LoggerFactory.getLogger(SnapshotIsolationExample.class);
81      private final byte[] qualifier;
82      private final byte[] initialData;
83      private final byte[] dataValue1;
84      private final byte[] dataValue2;
85      private RowIdGenerator rowIdGenerator = new StaticRowIdGenerator();
86      private String userTableName;
87      private byte[] family;
88      private TransactionManager tm;
89      private TTable txTable;
90  
91      public static void main(String[] args) throws Exception {
92          SnapshotIsolationExample example = new SnapshotIsolationExample(args);
93          example.execute();
94          example.close();
95      }
96  
97      SnapshotIsolationExample(String[] args) throws IOException, InterruptedException {
98          LOG.info("Parsing the command line arguments");
99          userTableName = "MY_TX_TABLE";
100         if (args != null && args.length > 0 && StringUtils.isNotEmpty(args[0])) {
101             userTableName = args[0];
102         }
103         family = Bytes.toBytes("MY_CF");
104         if (args != null && args.length > 1 && StringUtils.isNotEmpty(args[1])) {
105             family = Bytes.toBytes(args[1]);
106         }
107         LOG.info("Table '{}', column family '{}'", userTableName, Bytes.toString(family));
108 
109         qualifier = Bytes.toBytes("MY_Q");
110         initialData = Bytes.toBytes("initialVal");
111         dataValue1 = Bytes.toBytes("val1");
112         dataValue2 = Bytes.toBytes("val2");
113 
114         LOG.info("--------");
115         LOG.info("NOTE: All Transactions in the Example access column {}:{}/{}/{} [TABLE:ROW/CF/Q]",
116                  userTableName, Bytes.toString(rowIdGenerator.getRowId()), Bytes.toString(family),
117                  Bytes.toString(qualifier));
118         LOG.info("--------");
119 
120         LOG.info("Creating access to Omid Transaction Manager & Transactional Table '{}'", userTableName);
121         tm = HBaseTransactionManager.newInstance();
122         txTable = new TTable(ConnectionFactory.createConnection(), userTableName);
123     }
124 
125     void execute() throws IOException, RollbackException {
126 
127         // A transaction Tx0 sets an initial value to a particular column in an specific row
128         Transaction tx0 = tm.begin();
129         byte[] rowId = rowIdGenerator.getRowId();
130         Put initialPut = new Put(rowId);
131         initialPut.addColumn(family, qualifier, initialData);
132         txTable.put(tx0, initialPut);
133         tm.commit(tx0);
134         LOG.info("Initial Transaction {} COMMITTED. Base value written in {}:{}/{}/{} = {}",
135                  tx0, userTableName, Bytes.toString(rowId), Bytes.toString(family),
136                  Bytes.toString(qualifier), Bytes.toString(initialData));
137 
138         // Transaction Tx1 starts, creates its own snapshot of the current data in HBase and writes new data
139         Transaction tx1 = tm.begin();
140         LOG.info("Transaction {} STARTED", tx1);
141         Put tx1Put = new Put(rowId);
142         tx1Put.addColumn(family, qualifier, dataValue1);
143         txTable.put(tx1, tx1Put);
144         LOG.info("Transaction {} updates base value in {}:{}/{}/{} = {} in its own Snapshot",
145                  tx1, userTableName, Bytes.toString(rowId), Bytes.toString(family),
146                  Bytes.toString(qualifier), Bytes.toString(dataValue1));
147 
148         // A concurrent transaction Tx2 starts, creates its own snapshot and reads the column value
149         Transaction tx2 = tm.begin();
150         LOG.info("Concurrent Transaction {} STARTED", tx2);
151         Get tx2Get = new Get(rowId);
152         tx2Get.addColumn(family, qualifier);
153         Result tx2GetResult = txTable.get(tx2, tx2Get);
154         Preconditions.checkState(Arrays.equals(tx2GetResult.value(), initialData),
155                                  "As Tx1 is not yet committed, Tx2 should read the value set by Tx0 not the value written by Tx1");
156         LOG.info(
157             "Concurrent Transaction {} should read base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
158             tx2, userTableName, Bytes.toString(rowId), Bytes.toString(family),
159             Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
160 
161         // Transaction Tx1 tries to commit and as there're no conflicting changes, persists the new value in HBase
162         tm.commit(tx1);
163         LOG.info("Transaction {} COMMITTED. New column value {}:{}/{}/{} = {}",
164                  tx1, userTableName, Bytes.toString(rowId), Bytes.toString(family),
165                  Bytes.toString(qualifier), Bytes.toString(dataValue1));
166 
167         // Tx2 reading again after Tx1 commit must read data from its snapshot...
168         tx2Get = new Get(rowId);
169         tx2Get.addColumn(family, qualifier);
170         tx2GetResult = txTable.get(tx2, tx2Get);
171         // ...so it must read the initial value written by Tx0
172         LOG.info(
173             "Concurrent Transaction {} should read again base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
174             tx2, userTableName, Bytes.toString(rowId), Bytes.toString(family),
175             Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
176         Preconditions.checkState(Arrays.equals(tx2GetResult.value(), initialData),
177                                  "Tx2 must read the initial value written by Tx0");
178 
179         // Tx2 tries to write the column written by the committed concurrent transaction Tx1...
180         Put tx2Put = new Put(rowId);
181         tx2Put.addColumn(family, qualifier, dataValue2);
182         txTable.put(tx2, tx2Put);
183         LOG.info(
184             "Concurrent Transaction {} updates {}:{}/{}/{} = {} in its own Snapshot (Will conflict with {} at commit time)",
185             tx2, userTableName, Bytes.toString(rowId), Bytes.toString(family),
186             Bytes.toString(qualifier), Bytes.toString(dataValue1), tx1);
187 
188         // ... and when committing, Tx2 has to abort due to concurrent conflicts with committed transaction Tx1
189         try {
190             LOG.info("Concurrent Transaction {} TRYING TO COMMIT", tx2);
191             tm.commit(tx2);
192             // should throw an exception
193             Preconditions.checkState(false, "Should have thrown RollbackException");
194         } catch (RollbackException e) {
195             LOG.info("Concurrent Transaction {} ROLLED-BACK : {}", tx2, e.getMessage());
196         }
197     }
198 
199     private void close() throws IOException {
200         tm.close();
201         txTable.close();
202     }
203 
204 
205     void setRowIdGenerator(RowIdGenerator rowIdGenerator) {
206         this.rowIdGenerator = rowIdGenerator;
207     }
208 
209     private class StaticRowIdGenerator implements RowIdGenerator {
210 
211         @Override
212         public byte[] getRowId() {
213             return Bytes.toBytes("EXAMPLE_ROW");
214         }
215     }
216 }
217