1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.omid.tso;
19
20 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import org.apache.omid.proto.TSOProto;
22 import org.apache.omid.tso.ProgrammableTSOServer.Response.ResponseType;
23 import org.jboss.netty.bootstrap.ServerBootstrap;
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelFactory;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.ExceptionEvent;
30 import org.jboss.netty.channel.MessageEvent;
31 import org.jboss.netty.channel.SimpleChannelHandler;
32 import org.jboss.netty.channel.group.ChannelGroup;
33 import org.jboss.netty.channel.group.DefaultChannelGroup;
34 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import javax.inject.Inject;
39 import java.net.InetSocketAddress;
40 import java.nio.channels.ClosedChannelException;
41 import java.util.LinkedList;
42 import java.util.Queue;
43 import java.util.concurrent.Executors;
44
45
46
47
48 public class ProgrammableTSOServer extends SimpleChannelHandler {
49
50 private static final Logger LOG = LoggerFactory.getLogger(ProgrammableTSOServer.class);
51
52 private ChannelFactory factory;
53 private ChannelGroup channelGroup;
54
55 private Queue<Response> responseQueue = new LinkedList<>();
56
57 @Inject
58 public ProgrammableTSOServer(int port) {
59
60 factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder()
61 .setNameFormat("boss-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder()
62 .setNameFormat("worker-%d").build()), (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
63
64
65 channelGroup = new DefaultChannelGroup(ProgrammableTSOServer.class.getName());
66
67 ServerBootstrap bootstrap = new ServerBootstrap(factory);
68 bootstrap.setPipelineFactory(new TSOChannelHandler.TSOPipelineFactory(this));
69
70
71 Channel channel = bootstrap.bind(new InetSocketAddress(port));
72 channelGroup.add(channel);
73
74 LOG.info("********** Dumb TSO Server running on port {} **********", port);
75 }
76
77
78
79
80
81
82
83
84
85 public void queueResponse(Response r) {
86 responseQueue.add(r);
87 }
88
89
90
91
92 public void cleanResponses() {
93 responseQueue.clear();
94 }
95
96
97
98 @Override
99 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
100 channelGroup.add(ctx.getChannel());
101 }
102
103 @Override
104 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
105 channelGroup.remove(ctx.getChannel());
106 }
107
108
109
110
111 @Override
112 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
113 Object msg = e.getMessage();
114 if (msg instanceof TSOProto.Request) {
115 TSOProto.Request request = (TSOProto.Request) msg;
116 Channel channel = ctx.getChannel();
117 if (request.hasHandshakeRequest()) {
118 checkHandshake(ctx, request.getHandshakeRequest());
119 return;
120 }
121 if (!handshakeCompleted(ctx)) {
122 LOG.info("handshake not completed");
123 channel.close();
124 }
125
126 Response resp = responseQueue.poll();
127 if (request.hasTimestampRequest()) {
128 if (resp == null || resp.type != ResponseType.TIMESTAMP) {
129 throw new IllegalStateException("Expecting TS response to send but got " + resp);
130 }
131 TimestampResponse tsResp = (TimestampResponse) resp;
132 sendTimestampResponse(tsResp.startTS, channel);
133 } else if (request.hasCommitRequest()) {
134 if (resp == null) {
135 throw new IllegalStateException("Expecting COMMIT response to send but got null");
136 }
137 switch (resp.type) {
138 case COMMIT:
139 CommitResponse commitResp = (CommitResponse) resp;
140 sendCommitResponse(commitResp.startTS, commitResp.commitTS, channel);
141 break;
142 case ABORT:
143 AbortResponse abortResp = (AbortResponse) resp;
144 sendAbortResponse(abortResp.startTS, channel);
145 break;
146 default:
147 throw new IllegalStateException("Expecting COMMIT response to send but got " + resp.type);
148 }
149 } else {
150 LOG.error("Invalid request {}", request);
151 ctx.getChannel().close();
152 }
153 } else {
154 LOG.error("Unknown message type", msg);
155 }
156 }
157
158 @Override
159 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
160 if (e.getCause() instanceof ClosedChannelException) {
161 return;
162 }
163 LOG.warn("TSOHandler: Unexpected exception from downstream.", e.getCause());
164 Channels.close(e.getChannel());
165 }
166
167 private void checkHandshake(final ChannelHandlerContext ctx, TSOProto.HandshakeRequest request) {
168 TSOProto.HandshakeResponse.Builder response = TSOProto.HandshakeResponse.newBuilder();
169 if (request.hasClientCapabilities()) {
170
171 response.setClientCompatible(true).setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
172 TSOChannelContext tsoCtx = new TSOChannelContext();
173 tsoCtx.setHandshakeComplete();
174 ctx.setAttachment(tsoCtx);
175 } else {
176 response.setClientCompatible(false);
177 }
178 ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
179 }
180
181 private boolean handshakeCompleted(ChannelHandlerContext ctx) {
182 Object o = ctx.getAttachment();
183 if (o instanceof TSOChannelContext) {
184 TSOChannelContext tsoCtx = (TSOChannelContext) o;
185 return tsoCtx.getHandshakeComplete();
186 }
187 return false;
188 }
189
190 private void sendTimestampResponse(long startTimestamp, Channel c) {
191 TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
192 TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
193 respBuilder.setStartTimestamp(startTimestamp);
194 builder.setTimestampResponse(respBuilder.build());
195 c.write(builder.build());
196 }
197
198 private void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
199 TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
200 TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
201 commitBuilder.setAborted(false).setStartTimestamp(startTimestamp).setCommitTimestamp(commitTimestamp);
202 builder.setCommitResponse(commitBuilder.build());
203 c.write(builder.build());
204 }
205
206 private void sendAbortResponse(long startTimestamp, Channel c) {
207 TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
208 TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
209 commitBuilder.setAborted(true).setStartTimestamp(startTimestamp);
210 builder.setCommitResponse(commitBuilder.build());
211 c.write(builder.build());
212 }
213
214 private static class TSOChannelContext {
215 boolean handshakeComplete;
216
217 TSOChannelContext() {
218 handshakeComplete = false;
219 }
220
221 boolean getHandshakeComplete() {
222 return handshakeComplete;
223 }
224
225 void setHandshakeComplete() {
226 handshakeComplete = true;
227 }
228 }
229
230 public static class TimestampResponse extends Response {
231
232 final long startTS;
233
234 public TimestampResponse(long startTS) {
235 super(ResponseType.TIMESTAMP);
236 this.startTS = startTS;
237 }
238
239 }
240
241 public static class CommitResponse extends Response {
242
243 final long startTS;
244 final long commitTS;
245
246 public CommitResponse(long startTS, long commitTS) {
247 super(ResponseType.COMMIT);
248 this.startTS = startTS;
249 this.commitTS = commitTS;
250 }
251
252 }
253
254 public static class AbortResponse extends Response {
255
256 final long startTS;
257
258 public AbortResponse(long startTS) {
259 super(ResponseType.ABORT);
260 this.startTS = startTS;
261 }
262
263 }
264
265 abstract static class Response {
266
267 enum ResponseType {
268 TIMESTAMP, COMMIT, ABORT
269 }
270
271 final ResponseType type;
272
273 public Response(ResponseType type) {
274 this.type = type;
275 }
276
277 }
278
279 }