1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.io.IOException;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.Map.Entry;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.client.Connection;
29 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
30 import org.apache.omid.HBaseShims;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 public class RegionConnectionFactory {
35 public static final String COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRIES_NUMBER = "omid.commit.table.access.on.compaction.retries";
36 public static final String COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRY_PAUSE = "omid.commit.table.access.on.compaction.retry.pause";
37 public static final String COMMIT_TABLE_ACCESS_ON_READ_RETRIES_NUMBER = "omid.commit.table.access.on.read.retries";
38 public static final String COMMIT_TABLE_ACCESS_ON_READ_RETRY_PAUSE = "omid.commit.table.access.on.read.retry.pause";
39
40 private static final Logger LOG = LoggerFactory.getLogger(RegionConnectionFactory.class);
41 private static final int DEFAULT_COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRIES_NUMBER = 20;
42 private static final int DEFAULT_COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRY_PAUSE = 100;
43
44
45
46
47
48
49
50
51
52
53 private static final int DEFAULT_COMMIT_TABLE_ACCESS_ON_READ_RETRIES_NUMBER = HBaseShims.getNoRetriesNumber() + 1;
54 private static final int DEFAULT_COMMIT_TABLE_ACCESS_ON_READ_RETRY_PAUSE = 100;
55
56 private RegionConnectionFactory() {
57 }
58
59 public static enum ConnectionType {
60 COMPACTION_CONNECTION,
61 READ_CONNECTION,
62 DEFAULT_SERVER_CONNECTION;
63 }
64
65 private static Map<ConnectionType, Connection> connections =
66 new HashMap<ConnectionType, Connection>();
67
68
69
70
71
72
73
74
75
76
77 private static Configuration cloneConfig(Configuration toCopy) {
78 Configuration clone = new Configuration();
79 Iterator<Entry<String, String>> iterator = toCopy.iterator();
80 while (iterator.hasNext()) {
81 Entry<String, String> entry = iterator.next();
82 clone.set(entry.getKey(), entry.getValue());
83 }
84 return clone;
85 }
86
87 public static void shutdown() {
88 synchronized (RegionConnectionFactory.class) {
89 for (Connection connection : connections.values()) {
90 try {
91 connection.close();
92 } catch (IOException e) {
93 LOG.warn("Unable to close coprocessor connection", e);
94 }
95 }
96 connections.clear();
97 }
98 }
99
100
101 public static Connection getConnection(final ConnectionType connectionType, final RegionCoprocessorEnvironment env) throws IOException {
102 Connection connection = null;
103 if((connection = connections.get(connectionType)) == null) {
104 synchronized (RegionConnectionFactory.class) {
105 if((connection = connections.get(connectionType)) == null) {
106 connection = HBaseShims.newServerConnection(getTypeSpecificConfiguration(connectionType, env.getConfiguration()), env);
107 connections.put(connectionType, connection);
108 return connection;
109 }
110 }
111 }
112 return connection;
113 }
114
115 private static Configuration getTypeSpecificConfiguration(ConnectionType connectionType, Configuration conf) {
116 switch (connectionType) {
117 case COMPACTION_CONNECTION:
118 return getCompactionConfig(conf);
119 case DEFAULT_SERVER_CONNECTION:
120 return conf;
121 case READ_CONNECTION:
122 return getReadConfig(conf);
123 default:
124 return conf;
125 }
126 }
127
128 private static Configuration getCompactionConfig(Configuration conf) {
129 Configuration compactionConfig = cloneConfig(conf);
130
131 compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
132 conf.getInt(COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRIES_NUMBER,
133 DEFAULT_COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRIES_NUMBER));
134 compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
135 conf.getInt(COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRY_PAUSE,
136 DEFAULT_COMMIT_TABLE_ACCESS_ON_COMPACTION_RETRY_PAUSE));
137 return compactionConfig;
138 }
139
140 private static Configuration getReadConfig(Configuration conf) {
141 Configuration compactionConfig = cloneConfig(conf);
142
143 compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
144 conf.getInt(COMMIT_TABLE_ACCESS_ON_READ_RETRIES_NUMBER,
145 DEFAULT_COMMIT_TABLE_ACCESS_ON_READ_RETRIES_NUMBER));
146 compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
147 conf.getInt(COMMIT_TABLE_ACCESS_ON_READ_RETRY_PAUSE,
148 DEFAULT_COMMIT_TABLE_ACCESS_ON_READ_RETRY_PAUSE));
149 return compactionConfig;
150 }
151 }