From ca4d4c4753fcba9783abb3395800abc39afcd019 Mon Sep 17 00:00:00 2001 From: fengfei Date: Wed, 1 Nov 2017 20:40:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B9=B6=E5=8F=91=E4=BC=98=E5=8C=96=EF=BC=8C?= =?UTF-8?q?=E4=B8=8D=E5=85=BC=E5=AE=B9=E4=BB=A5=E5=89=8D=E7=9A=84=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 +- .../client/ClientChannelMannager.java | 87 ++++++++---- .../fengfei/lanproxy/client/Constants.java | 13 ++ .../lanproxy/client/ProxyClientContainer.java | 4 +- .../client/handlers/ClientChannelHandler.java | 119 +++++++++++------ .../handlers/RealServerChannelHandler.java | 7 +- .../listener/ProxyChannelBorrowListener.java | 11 ++ .../src/test/resources/log4j.properties | 2 +- .../fengfei/lanproxy/server/Constants.java | 13 ++ .../lanproxy/server/ProxyChannelManager.java | 116 +++++++--------- .../server/config/web/routes/RouteConfig.java | 8 +- .../server/handlers/ServerChannelHandler.java | 125 +++++++++++++----- .../server/handlers/UserChannelHandler.java | 46 ++++--- .../src/test/resources/config.properties | 2 +- .../src/test/resources/log4j.properties | 2 +- 15 files changed, 365 insertions(+), 194 deletions(-) create mode 100644 proxy-client/src/main/java/org/fengfei/lanproxy/client/Constants.java create mode 100644 proxy-client/src/main/java/org/fengfei/lanproxy/client/listener/ProxyChannelBorrowListener.java create mode 100644 proxy-server/src/main/java/org/fengfei/lanproxy/server/Constants.java diff --git a/pom.xml b/pom.xml index 96fd0c1..6b4106f 100644 --- a/pom.xml +++ b/pom.xml @@ -49,8 +49,8 @@ maven-compiler-plugin 3.1 - 1.8 - 1.8 + 1.7 + 1.7 UTF-8 diff --git a/proxy-client/src/main/java/org/fengfei/lanproxy/client/ClientChannelMannager.java b/proxy-client/src/main/java/org/fengfei/lanproxy/client/ClientChannelMannager.java index 269d494..6a3e3a1 100644 --- a/proxy-client/src/main/java/org/fengfei/lanproxy/client/ClientChannelMannager.java +++ b/proxy-client/src/main/java/org/fengfei/lanproxy/client/ClientChannelMannager.java @@ -4,12 +4,17 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.fengfei.lanproxy.client.listener.ProxyChannelBorrowListener; +import org.fengfei.lanproxy.common.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.util.AttributeKey; @@ -24,32 +29,68 @@ public class ClientChannelMannager { private static Logger logger = LoggerFactory.getLogger(ClientChannelMannager.class); - private static final AttributeKey USER_ID = AttributeKey.newInstance("user_id"); + private static final AttributeKey USER_CHANNEL_WRITEABLE = AttributeKey.newInstance("user_channel_writeable"); - private static final AttributeKey USER_CHANNEL_WRITEABLE = AttributeKey - .newInstance("user_channel_writeable"); + private static final AttributeKey CLIENT_CHANNEL_WRITEABLE = AttributeKey.newInstance("client_channel_writeable"); - private static final AttributeKey CLIENT_CHANNEL_WRITEABLE = AttributeKey - .newInstance("client_channel_writeable"); + private static final int MAX_POOL_SIZE = 100; private static Map realServerChannels = new ConcurrentHashMap(); - private static volatile Channel channel; + private static ConcurrentLinkedQueue proxyChannelPool = new ConcurrentLinkedQueue(); - public static void setChannel(Channel channel) { - ClientChannelMannager.channel = channel; + private static volatile Channel cmdChannel; + + private static Config config = Config.getInstance(); + + public static void borrowProxyChanel(Bootstrap bootstrap, final ProxyChannelBorrowListener borrowListener) { + Channel channel = proxyChannelPool.poll(); + if (channel != null) { + borrowListener.success(channel); + return; + } + + bootstrap.connect(config.getStringValue("server.host"), config.getIntValue("server.port")).addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + borrowListener.success(future.channel()); + } else { + logger.warn("connect proxy server failed", future.cause()); + borrowListener.error(future.cause()); + } + } + }); } - public static Channel getChannel() { - return channel; + public static void returnProxyChanel(Channel proxyChanel) { + if (proxyChannelPool.size() > MAX_POOL_SIZE) { + proxyChanel.close(); + } else { + proxyChannelPool.offer(proxyChanel); + logger.debug("return ProxyChanel to the pool, channel is {}, pool size is {} ", proxyChanel, proxyChannelPool.size()); + } + } + + public static void removeProxyChanel(Channel proxyChanel) { + proxyChannelPool.remove(proxyChanel); + } + + public static void setCmdChannel(Channel cmdChannel) { + ClientChannelMannager.cmdChannel = cmdChannel; + } + + public static Channel getCmdChannel() { + return cmdChannel; } public static void setRealServerChannelUserId(Channel realServerChannel, String userId) { - realServerChannel.attr(USER_ID).set(userId); + realServerChannel.attr(Constants.USER_ID).set(userId); } public static String getRealServerChannelUserId(Channel realServerChannel) { - return realServerChannel.attr(USER_ID).get(); + return realServerChannel.attr(Constants.USER_ID).get(); } public static Channel getRealServerChannel(String userId) { @@ -65,8 +106,7 @@ public class ClientChannelMannager { } public static boolean isRealServerReadable(Channel realServerChannel) { - return realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get() - && realServerChannel.attr(USER_CHANNEL_WRITEABLE).get(); + return realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get() && realServerChannel.attr(USER_CHANNEL_WRITEABLE).get(); } public static void setRealServerChannelReadability(Channel realServerChannel, Boolean client, Boolean user) { @@ -83,21 +123,22 @@ public class ClientChannelMannager { realServerChannel.attr(USER_CHANNEL_WRITEABLE).set(user); } - if (realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get() - && realServerChannel.attr(USER_CHANNEL_WRITEABLE).get()) { + if (realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get() && realServerChannel.attr(USER_CHANNEL_WRITEABLE).get()) { realServerChannel.config().setOption(ChannelOption.AUTO_READ, true); } else { realServerChannel.config().setOption(ChannelOption.AUTO_READ, false); } } - public static void notifyChannelWritabilityChanged(Channel channel) { - logger.debug("channel writability changed, {}", channel.isWritable()); - Iterator> entryIte = realServerChannels.entrySet().iterator(); - while (entryIte.hasNext()) { - setRealServerChannelReadability(entryIte.next().getValue(), channel.isWritable(), null); - } - } + // public static void notifyChannelWritabilityChanged(Channel channel) { + // logger.debug("channel writability changed, {}", channel.isWritable()); + // Iterator> entryIte = + // realServerChannels.entrySet().iterator(); + // while (entryIte.hasNext()) { + // setRealServerChannelReadability(entryIte.next().getValue(), + // channel.isWritable(), null); + // } + // } public static void clearRealServerChannels() { logger.warn("channel closed, clear real server channels"); diff --git a/proxy-client/src/main/java/org/fengfei/lanproxy/client/Constants.java b/proxy-client/src/main/java/org/fengfei/lanproxy/client/Constants.java new file mode 100644 index 0000000..7df87ad --- /dev/null +++ b/proxy-client/src/main/java/org/fengfei/lanproxy/client/Constants.java @@ -0,0 +1,13 @@ +package org.fengfei.lanproxy.client; + +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; + +public interface Constants { + + public static final AttributeKey NEXT_CHANNEL = AttributeKey.newInstance("nxt_channel"); + + public static final AttributeKey USER_ID = AttributeKey.newInstance("user_id"); + + public static final AttributeKey CLIENT_KEY = AttributeKey.newInstance("client_key"); +} diff --git a/proxy-client/src/main/java/org/fengfei/lanproxy/client/ProxyClientContainer.java b/proxy-client/src/main/java/org/fengfei/lanproxy/client/ProxyClientContainer.java index 192703d..8d5a117 100644 --- a/proxy-client/src/main/java/org/fengfei/lanproxy/client/ProxyClientContainer.java +++ b/proxy-client/src/main/java/org/fengfei/lanproxy/client/ProxyClientContainer.java @@ -85,7 +85,7 @@ public class ProxyClientContainer implements Container, ChannelStatusListener { ch.pipeline().addLast(new ProxyMessageDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP)); ch.pipeline().addLast(new ProxyMessageEncoder()); ch.pipeline().addLast(new IdleCheckHandler(IdleCheckHandler.READ_IDLE_TIME, IdleCheckHandler.WRITE_IDLE_TIME, 0)); - ch.pipeline().addLast(new ClientChannelHandler(realServerBootstrap, ProxyClientContainer.this)); + ch.pipeline().addLast(new ClientChannelHandler(realServerBootstrap, bootstrap, ProxyClientContainer.this)); } }); } @@ -110,7 +110,7 @@ public class ProxyClientContainer implements Container, ChannelStatusListener { if (future.isSuccess()) { // 连接成功,向服务器发送客户端认证信息(clientKey) - ClientChannelMannager.setChannel(future.channel()); + ClientChannelMannager.setCmdChannel(future.channel()); ProxyMessage proxyMessage = new ProxyMessage(); proxyMessage.setType(ProxyMessage.TYPE_AUTH); proxyMessage.setUri(config.getStringValue("client.key")); diff --git a/proxy-client/src/main/java/org/fengfei/lanproxy/client/handlers/ClientChannelHandler.java b/proxy-client/src/main/java/org/fengfei/lanproxy/client/handlers/ClientChannelHandler.java index 7ffafef..e2b2956 100644 --- a/proxy-client/src/main/java/org/fengfei/lanproxy/client/handlers/ClientChannelHandler.java +++ b/proxy-client/src/main/java/org/fengfei/lanproxy/client/handlers/ClientChannelHandler.java @@ -1,7 +1,10 @@ package org.fengfei.lanproxy.client.handlers; import org.fengfei.lanproxy.client.ClientChannelMannager; +import org.fengfei.lanproxy.client.Constants; import org.fengfei.lanproxy.client.listener.ChannelStatusListener; +import org.fengfei.lanproxy.client.listener.ProxyChannelBorrowListener; +import org.fengfei.lanproxy.common.Config; import org.fengfei.lanproxy.protocol.ProxyMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,10 +29,13 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler NEXT_CHANNEL = AttributeKey.newInstance("nxt_channel"); + + public static final AttributeKey USER_ID = AttributeKey.newInstance("user_id"); + + public static final AttributeKey CLIENT_KEY = AttributeKey.newInstance("client_key"); +} diff --git a/proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java b/proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java index cd6d77a..3832ae2 100644 --- a/proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java +++ b/proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java @@ -31,23 +31,19 @@ public class ProxyChannelManager { private static final AttributeKey> USER_CHANNELS = AttributeKey.newInstance("user_channels"); - private static final AttributeKey USER_ID = AttributeKey.newInstance("user_id"); - private static final AttributeKey REQUEST_LAN_INFO = AttributeKey.newInstance("request_lan_info"); private static final AttributeKey> CHANNEL_PORT = AttributeKey.newInstance("channel_port"); private static final AttributeKey CHANNEL_CLIENT_KEY = AttributeKey.newInstance("channel_client_key"); - private static final AttributeKey PROXY_CHANNEL_WRITEABLE = AttributeKey - .newInstance("proxy_channel_writeable"); + private static final AttributeKey PROXY_CHANNEL_WRITEABLE = AttributeKey.newInstance("proxy_channel_writeable"); - private static final AttributeKey REAL_BACKEND_SERVER_CHANNEL_WRITEABLE = AttributeKey - .newInstance("real_backend_server_channel_writeable"); + private static final AttributeKey REAL_BACKEND_SERVER_CHANNEL_WRITEABLE = AttributeKey.newInstance("real_backend_server_channel_writeable"); - private static Map portChannelMapping = new ConcurrentHashMap(); + private static Map portCmdChannelMapping = new ConcurrentHashMap(); - private static Map proxyChannels = new ConcurrentHashMap(); + private static Map cmdChannels = new ConcurrentHashMap(); static { ProxyConfig.getInstance().addConfigChangedListener(new ConfigChangedListener() { @@ -57,7 +53,7 @@ public class ProxyChannelManager { */ @Override public synchronized void onChanged() { - Iterator> ite = proxyChannels.entrySet().iterator(); + Iterator> ite = cmdChannels.entrySet().iterator(); while (ite.hasNext()) { Channel proxyChannel = ite.next().getValue(); String clientKey = proxyChannel.attr(CHANNEL_CLIENT_KEY).get(); @@ -65,21 +61,20 @@ public class ProxyChannelManager { // 去除已经去掉的clientKey配置 Set clientKeySet = ProxyConfig.getInstance().getClientKeySet(); if (!clientKeySet.contains(clientKey)) { - removeProxyChannel(proxyChannel); + removeCmdChannel(proxyChannel); continue; } if (proxyChannel.isActive()) { - List inetPorts = new ArrayList( - ProxyConfig.getInstance().getClientInetPorts(clientKey)); + List inetPorts = new ArrayList(ProxyConfig.getInstance().getClientInetPorts(clientKey)); Set inetPortSet = new HashSet(inetPorts); List channelInetPorts = new ArrayList(proxyChannel.attr(CHANNEL_PORT).get()); - synchronized (portChannelMapping) { + synchronized (portCmdChannelMapping) { // 移除旧的连接映射关系 for (int chanelInetPort : channelInetPorts) { - Channel channel = portChannelMapping.get(chanelInetPort); + Channel channel = portCmdChannelMapping.get(chanelInetPort); if (channel == null) { continue; } @@ -89,7 +84,7 @@ public class ProxyChannelManager { if (!inetPortSet.contains(chanelInetPort)) { // 移除新配置中不包含的端口 - portChannelMapping.remove(chanelInetPort); + portCmdChannelMapping.remove(chanelInetPort); proxyChannel.attr(CHANNEL_PORT).get().remove(new Integer(chanelInetPort)); } else { @@ -101,7 +96,7 @@ public class ProxyChannelManager { // 将新配置中增加的外网端口写入到映射配置中 for (int inetPort : inetPorts) { - portChannelMapping.put(inetPort, proxyChannel); + portCmdChannelMapping.put(inetPort, proxyChannel); proxyChannel.attr(CHANNEL_PORT).get().add(inetPort); } @@ -110,12 +105,11 @@ public class ProxyChannelManager { } } - ite = proxyChannels.entrySet().iterator(); + ite = cmdChannels.entrySet().iterator(); while (ite.hasNext()) { Entry entry = ite.next(); Channel proxyChannel = entry.getValue(); - logger.info("proxyChannel config, {}, {}, {} ,{}", entry.getKey(), proxyChannel, - getUserChannels(proxyChannel).size(), proxyChannel.attr(CHANNEL_PORT).get()); + logger.info("proxyChannel config, {}, {}, {} ,{}", entry.getKey(), proxyChannel, getUserChannels(proxyChannel).size(), proxyChannel.attr(CHANNEL_PORT).get()); } } @@ -147,12 +141,12 @@ public class ProxyChannelManager { } /** - * 增加代理服务器端口与代理客户端连接的映射关系 + * 增加代理服务器端口与代理控制客户端连接的映射关系 * * @param ports * @param channel */ - public static void addProxyChannel(List ports, String clientKey, Channel channel) { + public static void addCmdChannel(List ports, String clientKey, Channel channel) { if (ports == null) { throw new IllegalArgumentException("port can not be null"); @@ -160,16 +154,16 @@ public class ProxyChannelManager { // 客户端(proxy-client)相对较少,这里同步的比较重 // 保证服务器对外端口与客户端到服务器的连接关系在临界情况时调用removeChannel(Channel channel)时不出问题 - synchronized (portChannelMapping) { + synchronized (portCmdChannelMapping) { for (int port : ports) { - portChannelMapping.put(port, channel); + portCmdChannelMapping.put(port, channel); } } channel.attr(CHANNEL_PORT).set(ports); channel.attr(CHANNEL_CLIENT_KEY).set(clientKey); channel.attr(USER_CHANNELS).set(new ConcurrentHashMap()); - proxyChannels.put(clientKey, channel); + cmdChannels.put(clientKey, channel); } /** @@ -177,28 +171,28 @@ public class ProxyChannelManager { * * @param channel */ - public static void removeProxyChannel(Channel channel) { + public static void removeCmdChannel(Channel channel) { logger.warn("channel closed, clear user channels, {}", channel); if (channel.attr(CHANNEL_PORT).get() == null) { return; } String clientKey = channel.attr(CHANNEL_CLIENT_KEY).get(); - Channel channel0 = proxyChannels.remove(clientKey); + Channel channel0 = cmdChannels.remove(clientKey); if (channel != channel0) { - proxyChannels.put(clientKey, channel); + cmdChannels.put(clientKey, channel); } List ports = channel.attr(CHANNEL_PORT).get(); for (int port : ports) { - Channel proxyChannel = portChannelMapping.remove(port); + Channel proxyChannel = portCmdChannelMapping.remove(port); if (proxyChannel == null) { continue; } // 在执行断连之前新的连接已经连上来了 if (proxyChannel != channel) { - portChannelMapping.put(port, proxyChannel); + portCmdChannelMapping.put(port, proxyChannel); } } @@ -218,12 +212,12 @@ public class ProxyChannelManager { } } - public static Channel getChannel(Integer port) { - return portChannelMapping.get(port); + public static Channel getCmdChannel(Integer port) { + return portCmdChannelMapping.get(port); } - public static Channel getProxyChannel(String clientKey) { - return proxyChannels.get(clientKey); + public static Channel getCmdChannel(String clientKey) { + return cmdChannels.get(clientKey); } /** @@ -233,12 +227,12 @@ public class ProxyChannelManager { * @param userId * @param userChannel */ - public static void addUserChannel(Channel proxyChannel, String userId, Channel userChannel) { + public static void addUserChannelToCmdChannel(Channel cmdChannel, String userId, Channel userChannel) { InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress(); String lanInfo = ProxyConfig.getInstance().getLanInfo(sa.getPort()); - userChannel.attr(USER_ID).set(userId); + userChannel.attr(Constants.USER_ID).set(userId); userChannel.attr(REQUEST_LAN_INFO).set(lanInfo); - proxyChannel.attr(USER_CHANNELS).get().put(userId, userChannel); + cmdChannel.attr(USER_CHANNELS).get().put(userId, userChannel); } /** @@ -248,9 +242,13 @@ public class ProxyChannelManager { * @param userId * @return */ - public static Channel removeUserChannel(Channel proxyChannel, String userId) { - synchronized (proxyChannel) { - return proxyChannel.attr(USER_CHANNELS).get().remove(userId); + public static Channel removeUserChannelFromCmdChannel(Channel cmdChannel, String userId) { + if (cmdChannel.attr(USER_CHANNELS).get() == null) { + return null; + } + + synchronized (cmdChannel) { + return cmdChannel.attr(USER_CHANNELS).get().remove(userId); } } @@ -261,8 +259,8 @@ public class ProxyChannelManager { * @param userId * @return */ - public static Channel getUserChannel(Channel proxyChannel, String userId) { - return proxyChannel.attr(USER_CHANNELS).get().get(userId); + public static Channel getUserChannel(Channel cmdChannel, String userId) { + return cmdChannel.attr(USER_CHANNELS).get().get(userId); } /** @@ -272,7 +270,7 @@ public class ProxyChannelManager { * @return */ public static String getUserChannelUserId(Channel userChannel) { - return userChannel.attr(USER_ID).get(); + return userChannel.attr(Constants.USER_ID).get(); } /** @@ -286,13 +284,13 @@ public class ProxyChannelManager { } /** - * 获取代理客户端连接绑定的所有用户连接 + * 获取代理控制客户端连接绑定的所有用户连接 * - * @param proxyChannel + * @param cmdChannel * @return */ - public static Map getUserChannels(Channel proxyChannel) { - return proxyChannel.attr(USER_CHANNELS).get(); + public static Map getUserChannels(Channel cmdChannel) { + return cmdChannel.attr(USER_CHANNELS).get(); } /** @@ -302,10 +300,8 @@ public class ProxyChannelManager { * @param client * @param proxy */ - public static void setUserChannelReadability(Channel userChannel, Boolean realBackendServerChannelWriteability, - Boolean proxyChannelWriteability) { - logger.debug("update user channel readability, {} {} {}", userChannel, realBackendServerChannelWriteability, - proxyChannelWriteability); + public static void setUserChannelReadability(Channel userChannel, Boolean realBackendServerChannelWriteability, Boolean proxyChannelWriteability) { + logger.debug("update user channel readability, {} {} {}", userChannel, realBackendServerChannelWriteability, proxyChannelWriteability); synchronized (userChannel) { if (realBackendServerChannelWriteability != null) { userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).set(realBackendServerChannelWriteability); @@ -315,8 +311,7 @@ public class ProxyChannelManager { userChannel.attr(PROXY_CHANNEL_WRITEABLE).set(proxyChannelWriteability); } - if (userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).get() - && userChannel.attr(PROXY_CHANNEL_WRITEABLE).get()) { + if (userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).get() && userChannel.attr(PROXY_CHANNEL_WRITEABLE).get()) { // 代理客户端与后端服务器连接状态均为可写时,用户连接状态为可读 userChannel.config().setOption(ChannelOption.AUTO_READ, true); @@ -326,21 +321,4 @@ public class ProxyChannelManager { } } - /** - * 代理客户端连接写状态发生变化,更新关联的所有用户连接读状态 - * - * @param proxyChannel - */ - public static void notifyProxyChannelWritabilityChanged(Channel proxyChannel) { - - Map userChannels = getUserChannels(proxyChannel); - Iterator ite = userChannels.keySet().iterator(); - - // ConcurrentHashMap支持遍历过程中增删元素,所以这里不需要与增删方法同步 - while (ite.hasNext()) { - Channel userChannel = userChannels.get(ite.next()); - setUserChannelReadability(userChannel, null, proxyChannel.isWritable()); - } - } - } diff --git a/proxy-server/src/main/java/org/fengfei/lanproxy/server/config/web/routes/RouteConfig.java b/proxy-server/src/main/java/org/fengfei/lanproxy/server/config/web/routes/RouteConfig.java index a29c7f5..bb05ce8 100644 --- a/proxy-server/src/main/java/org/fengfei/lanproxy/server/config/web/routes/RouteConfig.java +++ b/proxy-server/src/main/java/org/fengfei/lanproxy/server/config/web/routes/RouteConfig.java @@ -62,8 +62,7 @@ public class RouteConfig { String auth = request.headers().get(HttpHeaders.Names.AUTHORIZATION); if (!authenticated && auth != null) { String[] authArr = auth.split(" "); - if (authArr.length == 2 && authArr[0].equals(ProxyConfig.getInstance().getConfigAdminUsername()) - && authArr[1].equals(ProxyConfig.getInstance().getConfigAdminPassword())) { + if (authArr.length == 2 && authArr[0].equals(ProxyConfig.getInstance().getConfigAdminUsername()) && authArr[1].equals(ProxyConfig.getInstance().getConfigAdminPassword())) { authenticated = true; } } @@ -83,7 +82,7 @@ public class RouteConfig { public ResponseInfo request(FullHttpRequest request) { List clients = ProxyConfig.getInstance().getClients(); for (Client client : clients) { - Channel channel = ProxyChannelManager.getProxyChannel(client.getClientKey()); + Channel channel = ProxyChannelManager.getCmdChannel(client.getClientKey()); if (channel != null) { client.setStatus(1);// online } else { @@ -138,8 +137,7 @@ public class RouteConfig { return ResponseInfo.build(ResponseInfo.CODE_INVILID_PARAMS, "Error username or password"); } - if (username.equals(ProxyConfig.getInstance().getConfigAdminUsername()) - && password.equals(ProxyConfig.getInstance().getConfigAdminPassword())) { + if (username.equals(ProxyConfig.getInstance().getConfigAdminUsername()) && password.equals(ProxyConfig.getInstance().getConfigAdminPassword())) { token = UUID.randomUUID().toString().replace("-", ""); return ResponseInfo.build(token); } diff --git a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java index 99d539d..6eb110e 100644 --- a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java +++ b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java @@ -3,6 +3,7 @@ package org.fengfei.lanproxy.server.handlers; import java.util.List; import org.fengfei.lanproxy.protocol.ProxyMessage; +import org.fengfei.lanproxy.server.Constants; import org.fengfei.lanproxy.server.ProxyChannelManager; import org.fengfei.lanproxy.server.config.ProxyConfig; import org.slf4j.Logger; @@ -28,26 +29,26 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler channel, {}, {}, {}", clientKey, ports, ctx.channel()); - ProxyChannelManager.addProxyChannel(ports, clientKey, ctx.channel()); + ProxyChannelManager.addCmdChannel(ports, clientKey, ctx.channel()); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - ProxyChannelManager.notifyProxyChannelWritabilityChanged(ctx.channel()); + Channel userChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get(); + if (userChannel != null) { + ProxyChannelManager.setUserChannelReadability(userChannel, null, ctx.channel().isWritable()); + } + super.channelWritabilityChanged(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - ProxyChannelManager.removeProxyChannel(ctx.channel()); + Channel userChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get(); + if (userChannel != null && userChannel.isActive()) { + String clientKey = ctx.channel().attr(Constants.CLIENT_KEY).get(); + String userId = ctx.channel().attr(Constants.USER_ID).get(); + Channel cmdChannel = ProxyChannelManager.getCmdChannel(clientKey); + ProxyChannelManager.removeUserChannelFromCmdChannel(cmdChannel, userId); + + // 数据发送完成后再关闭连接,解决http1.0数据传输问题 + userChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + userChannel.close(); + } else { + ProxyChannelManager.removeCmdChannel(ctx.channel()); + } + super.channelInactive(ctx); } diff --git a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java index 988cc71..710f28a 100755 --- a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java +++ b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java @@ -4,6 +4,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicLong; import org.fengfei.lanproxy.protocol.ProxyMessage; +import org.fengfei.lanproxy.server.Constants; import org.fengfei.lanproxy.server.ProxyChannelManager; import org.fengfei.lanproxy.server.config.ProxyConfig; @@ -31,8 +32,7 @@ public class UserChannelHandler extends SimpleChannelInboundHandler { // 通知代理客户端 Channel userChannel = ctx.channel(); - InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress(); - Channel proxyChannel = ProxyChannelManager.getChannel(sa.getPort()); + Channel proxyChannel = userChannel.attr(Constants.NEXT_CHANNEL).get(); if (proxyChannel == null) { // 该端口还没有代理客户端 @@ -53,9 +53,9 @@ public class UserChannelHandler extends SimpleChannelInboundHandler { public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel userChannel = ctx.channel(); InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress(); - Channel proxyChannel = ProxyChannelManager.getChannel(sa.getPort()); + Channel cmdChannel = ProxyChannelManager.getCmdChannel(sa.getPort()); - if (proxyChannel == null) { + if (cmdChannel == null) { // 该端口还没有代理客户端 ctx.channel().close(); @@ -63,14 +63,15 @@ public class UserChannelHandler extends SimpleChannelInboundHandler { String userId = newUserId(); String lanInfo = ProxyConfig.getInstance().getLanInfo(sa.getPort()); // 用户连接到代理服务器时,设置用户连接不可读,等待代理后端服务器连接成功后再改变为可读状态 - ProxyChannelManager.setUserChannelReadability(userChannel, false, proxyChannel.isWritable()); - ProxyChannelManager.addUserChannel(proxyChannel, userId, userChannel); + ProxyChannelManager.setUserChannelReadability(userChannel, false, false); + ProxyChannelManager.addUserChannelToCmdChannel(cmdChannel, userId, userChannel); ProxyMessage proxyMessage = new ProxyMessage(); proxyMessage.setType(ProxyMessage.TYPE_CONNECT); proxyMessage.setUri(userId); proxyMessage.setData(lanInfo.getBytes()); - proxyChannel.writeAndFlush(proxyMessage); + cmdChannel.writeAndFlush(proxyMessage); } + super.channelActive(ctx); } @@ -80,21 +81,30 @@ public class UserChannelHandler extends SimpleChannelInboundHandler { // 通知代理客户端 Channel userChannel = ctx.channel(); InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress(); - Channel proxyChannel = ProxyChannelManager.getChannel(sa.getPort()); - if (proxyChannel == null) { + Channel cmdChannel = ProxyChannelManager.getCmdChannel(sa.getPort()); + if (cmdChannel == null) { // 该端口还没有代理客户端 ctx.channel().close(); } else { - // 通知代理客户端,用户连接已经断开 + // 用户连接断开,从控制连接中移除 String userId = ProxyChannelManager.getUserChannelUserId(userChannel); - ProxyMessage proxyMessage = new ProxyMessage(); - proxyMessage.setType(ProxyMessage.TYPE_DISCONNECT); - proxyMessage.setUri(userId); - proxyChannel.writeAndFlush(proxyMessage); - ProxyChannelManager.removeUserChannel(proxyChannel, userId); + ProxyChannelManager.removeUserChannelFromCmdChannel(cmdChannel, userId); + Channel proxyChannel = userChannel.attr(Constants.NEXT_CHANNEL).get(); + if (proxyChannel != null && proxyChannel.isActive()) { + proxyChannel.attr(Constants.NEXT_CHANNEL).remove(); + proxyChannel.attr(Constants.CLIENT_KEY).remove(); + proxyChannel.attr(Constants.USER_ID).remove(); + + // 通知客户端,用户连接已经断开 + ProxyMessage proxyMessage = new ProxyMessage(); + proxyMessage.setType(ProxyMessage.TYPE_DISCONNECT); + proxyMessage.setUri(userId); + proxyChannel.writeAndFlush(proxyMessage); + } } + super.channelInactive(ctx); } @@ -104,8 +114,8 @@ public class UserChannelHandler extends SimpleChannelInboundHandler { // 通知代理客户端 Channel userChannel = ctx.channel(); InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress(); - Channel proxyChannel = ProxyChannelManager.getChannel(sa.getPort()); - if (proxyChannel == null) { + Channel cmdChannel = ProxyChannelManager.getCmdChannel(sa.getPort()); + if (cmdChannel == null) { // 该端口还没有代理客户端 ctx.channel().close(); @@ -117,7 +127,7 @@ public class UserChannelHandler extends SimpleChannelInboundHandler { proxyMessage.setType(ProxyMessage.TYPE_WRITE_CONTROL); proxyMessage.setUri(userId); proxyMessage.setData(userChannel.isWritable() ? new byte[] { 0x01 } : new byte[] { 0x00 }); - proxyChannel.writeAndFlush(proxyMessage); + cmdChannel.writeAndFlush(proxyMessage); } super.channelWritabilityChanged(ctx); diff --git a/proxy-server/src/test/resources/config.properties b/proxy-server/src/test/resources/config.properties index 1ac6c79..0ba14b8 100644 --- a/proxy-server/src/test/resources/config.properties +++ b/proxy-server/src/test/resources/config.properties @@ -10,6 +10,6 @@ server.ssl.keyManagerPassword=123456 server.ssl.needsClientAuth=false config.server.bind=0.0.0.0 -config.server.port=8090 +config.server.port=8099 config.admin.username=admin config.admin.password=admin \ No newline at end of file diff --git a/proxy-server/src/test/resources/log4j.properties b/proxy-server/src/test/resources/log4j.properties index 04a8097..f421762 100644 --- a/proxy-server/src/test/resources/log4j.properties +++ b/proxy-server/src/test/resources/log4j.properties @@ -1,4 +1,4 @@ -log4j.rootLogger=info,stdout +log4j.rootLogger=debug,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout