From 778b616540c0e14b42abb000c5bdd3924c436012 Mon Sep 17 00:00:00 2001 From: fengfei Date: Tue, 29 Aug 2017 17:08:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=8F=91=E9=80=81=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E5=90=8E=E5=86=8D=E5=85=B3=E9=97=AD=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=EF=BC=8C=E8=A7=A3=E5=86=B3http1.0=E6=95=B0=E6=8D=AE=E4=BC=A0?= =?UTF-8?q?=E8=BE=93=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/handlers/ServerChannelHandler.java | 259 +++++++++--------- 1 file changed, 131 insertions(+), 128 deletions(-) diff --git a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java index 5608289..d2ec217 100644 --- a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java +++ b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java @@ -1,129 +1,132 @@ -package org.fengfei.lanproxy.server.handlers; - -import java.util.List; - -import org.fengfei.lanproxy.protocol.ProxyMessage; -import org.fengfei.lanproxy.server.ProxyChannelManager; -import org.fengfei.lanproxy.server.config.ProxyConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; - -/** - * - * @author fengfei - * - */ -public class ServerChannelHandler extends SimpleChannelInboundHandler { - - private static Logger logger = LoggerFactory.getLogger(ServerChannelHandler.class); - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ProxyMessage proxyMessage) throws Exception { - logger.debug("ProxyMessage received {}", proxyMessage.getType()); - switch (proxyMessage.getType()) { - case ProxyMessage.TYPE_HEARTBEAT: - handleHeartbeatMessage(ctx, proxyMessage); - break; - case ProxyMessage.TYPE_AUTH: - handleAuthMessage(ctx, proxyMessage); - break; - case ProxyMessage.TYPE_CONNECT: - handleConnectMessage(ctx, proxyMessage); - break; - case ProxyMessage.TYPE_DISCONNECT: - handleDisconnectMessage(ctx, proxyMessage); - break; - case ProxyMessage.TYPE_TRANSFER: - handleTransferMessage(ctx, proxyMessage); - break; - case ProxyMessage.TYPE_WRITE_CONTROL: - handleWriteControlMessage(ctx, proxyMessage); - break; - default: - break; - } - } - - private void handleWriteControlMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { - String userId = proxyMessage.getUri(); - Channel userChannel = ProxyChannelManager.getUserChannel(ctx.channel(), userId); - if (userChannel != null) { - - // 同步代理客户端与后端服务器的连接可写状态 - boolean writeable = proxyMessage.getData()[0] == 0x01 ? true : false; - ProxyChannelManager.setUserChannelReadability(userChannel, writeable, null); - } - } - - private void handleTransferMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { - String userId = proxyMessage.getUri(); - Channel userChannel = ProxyChannelManager.getUserChannel(ctx.channel(), userId); - if (userChannel != null) { - ByteBuf buf = ctx.alloc().buffer(proxyMessage.getData().length); - buf.writeBytes(proxyMessage.getData()); - userChannel.writeAndFlush(buf); - } - } - - private void handleDisconnectMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { - String userId = proxyMessage.getUri(); - Channel userChannel = ProxyChannelManager.removeUserChannel(ctx.channel(), userId); - if (userChannel != null) { - userChannel.close(); - } - } - - private void handleConnectMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { - String userId = proxyMessage.getUri(); - Channel userChannel = ProxyChannelManager.getUserChannel(ctx.channel(), userId); - if (userChannel != null) { - - // 代理客户端与后端服务器连接成功,修改用户连接为可读状态 - ProxyChannelManager.setUserChannelReadability(userChannel, true, ctx.channel().isWritable()); - } - } - - private void handleHeartbeatMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { - ProxyMessage heartbeatMessage = new ProxyMessage(); - heartbeatMessage.setSerialNumber(heartbeatMessage.getSerialNumber()); - heartbeatMessage.setType(ProxyMessage.TYPE_HEARTBEAT); - logger.debug("response heartbeat message {}", heartbeatMessage); - ctx.channel().writeAndFlush(heartbeatMessage); - } - - private void handleAuthMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { - String clientKey = proxyMessage.getUri(); - List ports = ProxyConfig.getInstance().getClientInetPorts(clientKey); - if (ports == null) { - logger.info("error clientKey {}, {}", clientKey, ctx.channel()); - ctx.channel().close(); - return; - } - - logger.info("set port => channel, {} {}", clientKey, ports); - ProxyChannelManager.addProxyChannel(ports, clientKey, ctx.channel()); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - ProxyChannelManager.notifyProxyChannelWritabilityChanged(ctx.channel()); - super.channelWritabilityChanged(ctx); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - ProxyChannelManager.removeProxyChannel(ctx.channel()); - super.channelInactive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error("exception caught", cause); - super.exceptionCaught(ctx, cause); - } +package org.fengfei.lanproxy.server.handlers; + +import java.util.List; + +import org.fengfei.lanproxy.protocol.ProxyMessage; +import org.fengfei.lanproxy.server.ProxyChannelManager; +import org.fengfei.lanproxy.server.config.ProxyConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +/** + * + * @author fengfei + * + */ +public class ServerChannelHandler extends SimpleChannelInboundHandler { + + private static Logger logger = LoggerFactory.getLogger(ServerChannelHandler.class); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ProxyMessage proxyMessage) throws Exception { + logger.debug("ProxyMessage received {}", proxyMessage.getType()); + switch (proxyMessage.getType()) { + case ProxyMessage.TYPE_HEARTBEAT: + handleHeartbeatMessage(ctx, proxyMessage); + break; + case ProxyMessage.TYPE_AUTH: + handleAuthMessage(ctx, proxyMessage); + break; + case ProxyMessage.TYPE_CONNECT: + handleConnectMessage(ctx, proxyMessage); + break; + case ProxyMessage.TYPE_DISCONNECT: + handleDisconnectMessage(ctx, proxyMessage); + break; + case ProxyMessage.TYPE_TRANSFER: + handleTransferMessage(ctx, proxyMessage); + break; + case ProxyMessage.TYPE_WRITE_CONTROL: + handleWriteControlMessage(ctx, proxyMessage); + break; + default: + break; + } + } + + private void handleWriteControlMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { + String userId = proxyMessage.getUri(); + Channel userChannel = ProxyChannelManager.getUserChannel(ctx.channel(), userId); + if (userChannel != null) { + + // 同步代理客户端与后端服务器的连接可写状态 + boolean writeable = proxyMessage.getData()[0] == 0x01 ? true : false; + ProxyChannelManager.setUserChannelReadability(userChannel, writeable, null); + } + } + + private void handleTransferMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { + String userId = proxyMessage.getUri(); + Channel userChannel = ProxyChannelManager.getUserChannel(ctx.channel(), userId); + if (userChannel != null) { + ByteBuf buf = ctx.alloc().buffer(proxyMessage.getData().length); + buf.writeBytes(proxyMessage.getData()); + userChannel.writeAndFlush(buf); + } + } + + private void handleDisconnectMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { + String userId = proxyMessage.getUri(); + Channel userChannel = ProxyChannelManager.removeUserChannel(ctx.channel(), userId); + if (userChannel != null) { + // 数据发送完成后再关闭连接,解决http1.0数据传输问题 + ByteBuf buf = ctx.alloc().buffer(0); + userChannel.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE); + } + } + + private void handleConnectMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { + String userId = proxyMessage.getUri(); + Channel userChannel = ProxyChannelManager.getUserChannel(ctx.channel(), userId); + if (userChannel != null) { + + // 代理客户端与后端服务器连接成功,修改用户连接为可读状态 + ProxyChannelManager.setUserChannelReadability(userChannel, true, ctx.channel().isWritable()); + } + } + + private void handleHeartbeatMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { + ProxyMessage heartbeatMessage = new ProxyMessage(); + heartbeatMessage.setSerialNumber(heartbeatMessage.getSerialNumber()); + heartbeatMessage.setType(ProxyMessage.TYPE_HEARTBEAT); + logger.debug("response heartbeat message {}", heartbeatMessage); + ctx.channel().writeAndFlush(heartbeatMessage); + } + + private void handleAuthMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) { + String clientKey = proxyMessage.getUri(); + List ports = ProxyConfig.getInstance().getClientInetPorts(clientKey); + if (ports == null) { + logger.info("error clientKey {}, {}", clientKey, ctx.channel()); + ctx.channel().close(); + return; + } + + logger.info("set port => channel, {} {}", clientKey, ports); + ProxyChannelManager.addProxyChannel(ports, clientKey, ctx.channel()); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + ProxyChannelManager.notifyProxyChannelWritabilityChanged(ctx.channel()); + super.channelWritabilityChanged(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ProxyChannelManager.removeProxyChannel(ctx.channel()); + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + logger.error("exception caught", cause); + super.exceptionCaught(ctx, cause); + } } \ No newline at end of file