1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import org.apache.omid.metrics.MetricsRegistry;
23 import org.apache.omid.proto.TSOProto;
24 import org.jboss.netty.bootstrap.ServerBootstrap;
25 import org.jboss.netty.channel.Channel;
26 import org.jboss.netty.channel.ChannelFactory;
27 import org.jboss.netty.channel.ChannelHandler;
28 import org.jboss.netty.channel.ChannelHandlerContext;
29 import org.jboss.netty.channel.ChannelPipeline;
30 import org.jboss.netty.channel.ChannelPipelineFactory;
31 import org.jboss.netty.channel.ChannelStateEvent;
32 import org.jboss.netty.channel.Channels;
33 import org.jboss.netty.channel.ExceptionEvent;
34 import org.jboss.netty.channel.MessageEvent;
35 import org.jboss.netty.channel.SimpleChannelHandler;
36 import org.jboss.netty.channel.group.ChannelGroup;
37 import org.jboss.netty.channel.group.DefaultChannelGroup;
38 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
39 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
40 import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
41 import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
42 import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import javax.inject.Inject;
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.net.InetSocketAddress;
50 import java.nio.channels.ClosedChannelException;
51 import java.util.concurrent.Executors;
52
53
54
55
56
57
58 public class TSOChannelHandler extends SimpleChannelHandler implements Closeable {
59
60 private static final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
61
62 private final ChannelFactory factory;
63
64 private final ServerBootstrap bootstrap;
65
66 @VisibleForTesting
67 Channel listeningChannel;
68 @VisibleForTesting
69 ChannelGroup channelGroup;
70
71 private RequestProcessor requestProcessor;
72
73 private TSOServerConfig config;
74
75 private MetricsRegistry metrics;
76
77 @Inject
78 public TSOChannelHandler(TSOServerConfig config, RequestProcessor requestProcessor, MetricsRegistry metrics) {
79
80 this.config = config;
81 this.metrics = metrics;
82 this.requestProcessor = requestProcessor;
83
84 this.factory = new NioServerSocketChannelFactory(
85 Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("boss-%d").build()),
86 Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("worker-%d").build()),
87 (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
88
89 this.bootstrap = new ServerBootstrap(factory);
90 bootstrap.setPipelineFactory(new TSOPipelineFactory(this));
91
92 }
93
94
95
96
97 void reconnect() {
98 if (listeningChannel == null && channelGroup == null) {
99 LOG.debug("Creating communication channel...");
100 } else {
101 LOG.debug("Reconnecting communication channel...");
102 closeConnection();
103 }
104
105 channelGroup = new DefaultChannelGroup(TSOChannelHandler.class.getName());
106 LOG.debug("\tCreating channel to listening for incoming connections in port {}", config.getPort());
107 listeningChannel = bootstrap.bind(new InetSocketAddress(config.getPort()));
108 channelGroup.add(listeningChannel);
109 LOG.debug("\tListening channel created and connected: {}", listeningChannel);
110 }
111
112
113
114
115 void closeConnection() {
116 LOG.debug("Closing communication channel...");
117 if (listeningChannel != null) {
118 LOG.debug("\tUnbinding listening channel {}", listeningChannel);
119 listeningChannel.unbind().awaitUninterruptibly();
120 LOG.debug("\tListening channel {} unbound", listeningChannel);
121 }
122 if (channelGroup != null) {
123 LOG.debug("\tClosing channel group {}", channelGroup);
124 channelGroup.close().awaitUninterruptibly();
125 LOG.debug("\tChannel group {} closed", channelGroup);
126 }
127 }
128
129
130
131
132
133 @Override
134 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
135 channelGroup.add(ctx.getChannel());
136 LOG.debug("TSO channel connected: {}", ctx.getChannel());
137 }
138
139 @Override
140 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
141 channelGroup.remove(ctx.getChannel());
142 LOG.debug("TSO channel disconnected: {}", ctx.getChannel());
143 }
144
145 @Override
146 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
147 LOG.debug("TSO channel closed: {}", ctx.getChannel());
148 }
149
150
151
152
153 @Override
154 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
155 Object msg = e.getMessage();
156 if (msg instanceof TSOProto.Request) {
157 TSOProto.Request request = (TSOProto.Request) msg;
158 if (request.hasHandshakeRequest()) {
159 checkHandshake(ctx, request.getHandshakeRequest());
160 return;
161 }
162 if (!handshakeCompleted(ctx)) {
163 LOG.error("Handshake not completed. Closing channel {}", ctx.getChannel());
164 ctx.getChannel().close();
165 }
166
167 if (request.hasTimestampRequest()) {
168 requestProcessor.timestampRequest(ctx.getChannel(), MonitoringContextFactory.getInstance(config,metrics));
169 } else if (request.hasCommitRequest()) {
170 TSOProto.CommitRequest cr = request.getCommitRequest();
171 requestProcessor.commitRequest(cr.getStartTimestamp(),
172 cr.getCellIdList(),
173 cr.getTableIdList(),
174 cr.getIsRetry(),
175 ctx.getChannel(),
176 MonitoringContextFactory.getInstance(config,metrics));
177 } else if (request.hasFenceRequest()) {
178 TSOProto.FenceRequest fr = request.getFenceRequest();
179 requestProcessor.fenceRequest(fr.getTableId(),
180 ctx.getChannel(),
181 MonitoringContextFactory.getInstance(config,metrics));
182 } else {
183 LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
184 ctx.getChannel().close();
185 }
186 } else {
187 LOG.error("Unknown message type", msg);
188 }
189 }
190
191 @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
192 @Override
193 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
194 if (e.getCause() instanceof ClosedChannelException) {
195 LOG.warn("ClosedChannelException caught. Cause: ", e.getCause());
196 return;
197 }
198 LOG.warn("Unexpected exception from downstream. Closing channel {} {}", ctx.getChannel(), e.getCause());
199 ctx.getChannel().close();
200 }
201
202
203
204
205 @Override
206 public void close() throws IOException {
207 closeConnection();
208 factory.releaseExternalResources();
209 }
210
211
212
213
214
215
216
217
218 private static class TSOChannelContext {
219
220 boolean handshakeComplete;
221
222 TSOChannelContext() {
223 handshakeComplete = false;
224 }
225
226 boolean getHandshakeComplete() {
227 return handshakeComplete;
228 }
229
230 void setHandshakeComplete() {
231 handshakeComplete = true;
232 }
233
234 }
235
236 private void checkHandshake(final ChannelHandlerContext ctx, TSOProto.HandshakeRequest request) {
237
238 TSOProto.HandshakeResponse.Builder response = TSOProto.HandshakeResponse.newBuilder();
239 if (request.hasClientCapabilities()) {
240
241 response.setClientCompatible(true)
242 .setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
243 TSOChannelContext tsoCtx = new TSOChannelContext();
244 tsoCtx.setHandshakeComplete();
245 ctx.setAttachment(tsoCtx);
246 } else {
247 response.setClientCompatible(false);
248 }
249 response.setLowLatency(config.getLowLatency());
250 ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
251
252 }
253
254 private boolean handshakeCompleted(ChannelHandlerContext ctx) {
255
256 Object o = ctx.getAttachment();
257 if (o instanceof TSOChannelContext) {
258 TSOChannelContext tsoCtx = (TSOChannelContext) o;
259 return tsoCtx.getHandshakeComplete();
260 }
261 return false;
262
263 }
264
265
266
267
268 static class TSOPipelineFactory implements ChannelPipelineFactory {
269
270 private final ChannelHandler handler;
271
272 TSOPipelineFactory(ChannelHandler handler) {
273 this.handler = handler;
274 }
275
276 public ChannelPipeline getPipeline() throws Exception {
277
278 ChannelPipeline pipeline = Channels.pipeline();
279
280
281
282 pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
283 pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
284 pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
285 pipeline.addLast("protobufencoder", new ProtobufEncoder());
286 pipeline.addLast("handler", handler);
287
288 return pipeline;
289 }
290 }
291
292 }