From c6f01580e6028bf6e4ee4a8dfaa4b281185d1869 Mon Sep 17 00:00:00 2001 From: fengfei Date: Sun, 13 Aug 2017 23:11:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=9C=A8=E7=BA=BF=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lanproxy/client/ProxyClientContainer.java | 70 +++++---- .../src/main/resources/config.properties | 2 +- .../src/test/resources/config.properties | 4 +- .../src/test/resources/log4j.properties | 2 +- .../common/ConcurrentHashMapExample.java | 51 ------- .../lanproxy/server/ProxyChannelManager.java | 141 +++++++++++++++--- .../lanproxy/server/config/ProxyConfig.java | 15 +- .../server/handlers/ServerChannelHandler.java | 44 +++--- .../server/handlers/UserChannelHandler.java | 4 +- .../src/main/resources/config.properties | 2 +- .../src/test/resources/config.properties | 2 +- 11 files changed, 204 insertions(+), 133 deletions(-) delete mode 100644 proxy-common/src/main/java/org/fengfei/lanproxy/common/ConcurrentHashMapExample.java diff --git a/proxy-client/src/main/java/org/fengfei/lanproxy/client/ProxyClientContainer.java b/proxy-client/src/main/java/org/fengfei/lanproxy/client/ProxyClientContainer.java index 73b82ec..192703d 100644 --- a/proxy-client/src/main/java/org/fengfei/lanproxy/client/ProxyClientContainer.java +++ b/proxy-client/src/main/java/org/fengfei/lanproxy/client/ProxyClientContainer.java @@ -53,6 +53,8 @@ public class ProxyClientContainer implements Container, ChannelStatusListener { private SSLContext sslContext; + private long sleepTimeMill = 1000; + public ProxyClientContainer() { workerGroup = new NioEventLoopGroup(); realServerBootstrap = new Bootstrap(); @@ -80,11 +82,9 @@ public class ProxyClientContainer implements Container, ChannelStatusListener { ch.pipeline().addLast(createSslHandler(sslContext)); } - ch.pipeline().addLast(new ProxyMessageDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, - LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP)); + ch.pipeline().addLast(new ProxyMessageDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP)); ch.pipeline().addLast(new ProxyMessageEncoder()); - ch.pipeline().addLast( - new IdleCheckHandler(IdleCheckHandler.READ_IDLE_TIME, IdleCheckHandler.WRITE_IDLE_TIME, 0)); + ch.pipeline().addLast(new IdleCheckHandler(IdleCheckHandler.READ_IDLE_TIME, IdleCheckHandler.WRITE_IDLE_TIME, 0)); ch.pipeline().addLast(new ClientChannelHandler(realServerBootstrap, ProxyClientContainer.this)); } }); @@ -103,29 +103,29 @@ public class ProxyClientContainer implements Container, ChannelStatusListener { private void connectProxyServer() { - bootstrap.connect(config.getStringValue("server.host"), config.getIntValue("server.port")) - .addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - - // 连接成功,向服务器发送客户端认证信息(clientKey) - ClientChannelMannager.setChannel(future.channel()); - ProxyMessage proxyMessage = new ProxyMessage(); - proxyMessage.setType(ProxyMessage.TYPE_AUTH); - proxyMessage.setUri(config.getStringValue("client.key")); - future.channel().writeAndFlush(proxyMessage); - logger.info("connect proxy server success, {}", future.channel()); - } else { - logger.warn("connect proxy server failed", future.cause()); - - // 连接失败,延时1秒发起重连 - Thread.sleep(1000); - connectProxyServer(); - } - } - }); + bootstrap.connect(config.getStringValue("server.host"), config.getIntValue("server.port")).addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + + // 连接成功,向服务器发送客户端认证信息(clientKey) + ClientChannelMannager.setChannel(future.channel()); + ProxyMessage proxyMessage = new ProxyMessage(); + proxyMessage.setType(ProxyMessage.TYPE_AUTH); + proxyMessage.setUri(config.getStringValue("client.key")); + future.channel().writeAndFlush(proxyMessage); + sleepTimeMill = 1000; + logger.info("connect proxy server success, {}", future.channel()); + } else { + logger.warn("connect proxy server failed", future.cause()); + + // 连接失败,发起重连 + reconnectWait(); + connectProxyServer(); + } + } + }); } @Override @@ -135,12 +135,22 @@ public class ProxyClientContainer implements Container, ChannelStatusListener { @Override public void channelInactive(ChannelHandlerContext ctx) { + reconnectWait(); + connectProxyServer(); + } + + private void reconnectWait() { try { - Thread.sleep(1000); + if (sleepTimeMill > 60000) { + sleepTimeMill = 1000; + } + + synchronized (this) { + sleepTimeMill = sleepTimeMill * 2; + wait(sleepTimeMill); + } } catch (InterruptedException e) { } - - connectProxyServer(); } public static void main(String[] args) { diff --git a/proxy-client/src/main/resources/config.properties b/proxy-client/src/main/resources/config.properties index 2005842..b48b6e2 100644 --- a/proxy-client/src/main/resources/config.properties +++ b/proxy-client/src/main/resources/config.properties @@ -5,5 +5,5 @@ ssl.keyStorePassword=123456 server.host=127.0.0.1 -#default ssl port is 8883 +#default ssl port is 4993 server.port=4900 \ No newline at end of file diff --git a/proxy-client/src/test/resources/config.properties b/proxy-client/src/test/resources/config.properties index 6157fbc..1a56eaf 100644 --- a/proxy-client/src/test/resources/config.properties +++ b/proxy-client/src/test/resources/config.properties @@ -5,5 +5,5 @@ ssl.keyStorePassword=123456 server.host=127.0.0.1 -#default ssl port is 8883, none ssl port is 4900 -server.port=8883 +#default ssl port is 4993 +server.port=4993 diff --git a/proxy-client/src/test/resources/log4j.properties b/proxy-client/src/test/resources/log4j.properties index a660f66..3508d54 100644 --- a/proxy-client/src/test/resources/log4j.properties +++ b/proxy-client/src/test/resources/log4j.properties @@ -1,4 +1,4 @@ -log4j.rootLogger=debug,stdout +log4j.rootLogger=info,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/proxy-common/src/main/java/org/fengfei/lanproxy/common/ConcurrentHashMapExample.java b/proxy-common/src/main/java/org/fengfei/lanproxy/common/ConcurrentHashMapExample.java deleted file mode 100644 index 7971384..0000000 --- a/proxy-common/src/main/java/org/fengfei/lanproxy/common/ConcurrentHashMapExample.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.fengfei.lanproxy.common; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class ConcurrentHashMapExample { - - public static void main(String[] args) { - - // ConcurrentHashMap - Map myMap = new ConcurrentHashMap(); - myMap.put("1", "1"); - myMap.put("2", "1"); - myMap.put("3", "1"); - myMap.put("4", "1"); - myMap.put("5", "1"); - myMap.put("6", "1"); - System.out.println("ConcurrentHashMap before iterator: " + myMap); - Iterator it = myMap.keySet().iterator(); - - while (it.hasNext()) { - String key = it.next(); - - if (key.equals("3")) { - myMap.remove("3"); - } - } - System.out.println("ConcurrentHashMap after iterator: " + myMap); - - // HashMap - myMap = new HashMap(); - myMap.put("1", "1"); - myMap.put("2", "1"); - myMap.put("3", "1"); - myMap.put("4", "1"); - myMap.put("5", "1"); - myMap.put("6", "1"); - System.out.println("HashMap before iterator: " + myMap); - Iterator it1 = myMap.keySet().iterator(); - - while (it1.hasNext()) { - String key = it1.next(); - if (key.equals("3")) - myMap.put(key + "new", "new3"); - } - System.out.println("HashMap after iterator: " + myMap); - } - -} \ No newline at end of file diff --git a/proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java b/proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java index 7413226..23472eb 100644 --- a/proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java +++ b/proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java @@ -1,8 +1,17 @@ package org.fengfei.lanproxy.server; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.util.AttributeKey; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.fengfei.lanproxy.server.config.ProxyConfig; @@ -10,10 +19,6 @@ import org.fengfei.lanproxy.server.config.ProxyConfig.ConfigChangedListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.channel.Channel; -import io.netty.channel.ChannelOption; -import io.netty.util.AttributeKey; - /** * 代理服务连接管理(代理客户端连接+用户请求连接) * @@ -28,13 +33,15 @@ public class ProxyChannelManager { private static final AttributeKey USER_ID = AttributeKey.newInstance("user_id"); + private static final AttributeKey REQUEST_LAN_INFO = AttributeKey.newInstance("request_lan_info"); + private static final AttributeKey> CHANNEL_PORT = AttributeKey.newInstance("channel_port"); - private static final AttributeKey PROXY_CHANNEL_WRITEABLE = AttributeKey - .newInstance("proxy_channel_writeable"); + private static final AttributeKey CHANNEL_CLIENT_KEY = AttributeKey.newInstance("channel_client_key"); - private static final AttributeKey REAL_BACKEND_SERVER_CHANNEL_WRITEABLE = AttributeKey - .newInstance("real_backend_server_channel_writeable"); + private static final AttributeKey PROXY_CHANNEL_WRITEABLE = AttributeKey.newInstance("proxy_channel_writeable"); + + private static final AttributeKey REAL_BACKEND_SERVER_CHANNEL_WRITEABLE = AttributeKey.newInstance("real_backend_server_channel_writeable"); private static Map proxyChannels = new ConcurrentHashMap(); @@ -45,12 +52,97 @@ public class ProxyChannelManager { * 代理配置发生变化时回调 */ @Override - public void onChanged() { - Iterator ite = proxyChannels.keySet().iterator(); + public synchronized void onChanged() { + Iterator> ite = proxyChannels.entrySet().iterator(); + Set proxyChannelSet = new HashSet(); while (ite.hasNext()) { - Channel proxyChannel = proxyChannels.get(ite.next()); - if (proxyChannel != null && proxyChannel.isActive()) { + Channel proxyChannel = ite.next().getValue(); + + // 因为不同的外网端口可能映射了相同的内部代理连接,相同代理连接处理一次即可 + if (proxyChannelSet.contains(proxyChannel)) { + continue; + } + + proxyChannelSet.add(proxyChannel); + + String clientKey = proxyChannel.attr(CHANNEL_CLIENT_KEY).get(); + + // 去除已经去掉的clientKey配置 + Set clientKeySet = ProxyConfig.getInstance().getClientKeySet(); + if (!clientKeySet.contains(clientKey)) { removeProxyChannel(proxyChannel); + continue; + } + + if (proxyChannel.isActive()) { + List inetPorts = new ArrayList(ProxyConfig.getInstance().getClientInetPorts(clientKey)); + Set inetPortSet = new HashSet(inetPorts); + List channelInetPorts = new ArrayList(proxyChannel.attr(CHANNEL_PORT).get()); + + synchronized (proxyChannels) { + + // 移除旧的连接映射关系 + for (int chanelInetPort : channelInetPorts) { + Channel channel = proxyChannels.get(chanelInetPort); + if (proxyChannel == null) { + continue; + } + + // 判断是否是同一个连接对象,有可能之前已经更换成其他client的连接了 + if (proxyChannel == channel) { + if (!inetPortSet.contains(chanelInetPort)) { + + // 移除新配置中不包含的端口 + proxyChannels.remove(chanelInetPort); + proxyChannel.attr(CHANNEL_PORT).get().remove(new Integer(chanelInetPort)); + } else { + + // 端口已经在改proxyChannel中使用了 + inetPorts.remove(new Integer(chanelInetPort)); + } + } + } + + // 将新配置中增加的外网端口写入到映射配置中 + for (int inetPort : inetPorts) { + proxyChannels.put(inetPort, proxyChannel); + proxyChannel.attr(CHANNEL_PORT).get().add(inetPort); + } + + checkAndClearUserChannels(proxyChannel); + } + } + } + + ite = proxyChannels.entrySet().iterator(); + while (ite.hasNext()) { + Entry entry = ite.next(); + Channel proxyChannel = entry.getValue(); + logger.info("proxyChannel config, {}, {}, {} ,{}", entry.getKey(), proxyChannel, getUserChannels(proxyChannel).size(), proxyChannel.attr(CHANNEL_PORT).get()); + } + } + + /** + * 检测连接配置是否与当前配置一致,不一致则关闭 + * + * @param proxyChannel + */ + private void checkAndClearUserChannels(Channel proxyChannel) { + Map userChannels = getUserChannels(proxyChannel); + Iterator> userChannelIte = userChannels.entrySet().iterator(); + while (userChannelIte.hasNext()) { + Entry entry = userChannelIte.next(); + Channel userChannel = entry.getValue(); + String requestLanInfo = getUserChannelRequestLanInfo(userChannel); + InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress(); + String lanInfo = ProxyConfig.getInstance().getLanInfo(sa.getPort()); + + // 判断当前配置中对应外网端口的lan信息是否与正在运行的连接中的lan信息是否一致 + if (lanInfo == null || !lanInfo.equals(requestLanInfo)) { + userChannel.close(); + + // ConcurrentHashMap不会报ConcurrentModificationException异常 + userChannels.remove(entry.getKey()); } } } @@ -63,7 +155,7 @@ public class ProxyChannelManager { * @param ports * @param channel */ - public static void addProxyChannel(List ports, Channel channel) { + public static void addProxyChannel(List ports, String clientKey, Channel channel) { if (ports == null) { throw new IllegalArgumentException("port can not be null"); @@ -80,6 +172,7 @@ public class ProxyChannelManager { } channel.attr(CHANNEL_PORT).set(ports); + channel.attr(CHANNEL_CLIENT_KEY).set(clientKey); channel.attr(USER_CHANNELS).set(new ConcurrentHashMap()); } @@ -135,7 +228,10 @@ public class ProxyChannelManager { * @param userChannel */ public static void addUserChannel(Channel proxyChannel, String userId, Channel userChannel) { + InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress(); + String lanInfo = ProxyConfig.getInstance().getLanInfo(sa.getPort()); userChannel.attr(USER_ID).set(userId); + userChannel.attr(REQUEST_LAN_INFO).set(lanInfo); proxyChannel.attr(USER_CHANNELS).get().put(userId, userChannel); } @@ -173,6 +269,16 @@ public class ProxyChannelManager { return userChannel.attr(USER_ID).get(); } + /** + * 获取用户请求的内网IP端口信息 + * + * @param userChannel + * @return + */ + public static String getUserChannelRequestLanInfo(Channel userChannel) { + return userChannel.attr(REQUEST_LAN_INFO).get(); + } + /** * 获取代理客户端连接绑定的所有用户连接 * @@ -190,10 +296,8 @@ public class ProxyChannelManager { * @param client * @param proxy */ - public static void setUserChannelReadability(Channel userChannel, Boolean realBackendServerChannelWriteability, - Boolean proxyChannelWriteability) { - logger.info("update user channel readability, {} {} {}", userChannel, realBackendServerChannelWriteability, - proxyChannelWriteability); + public static void setUserChannelReadability(Channel userChannel, Boolean realBackendServerChannelWriteability, Boolean proxyChannelWriteability) { + logger.info("update user channel readability, {} {} {}", userChannel, realBackendServerChannelWriteability, proxyChannelWriteability); synchronized (userChannel) { if (realBackendServerChannelWriteability != null) { userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).set(realBackendServerChannelWriteability); @@ -203,8 +307,7 @@ public class ProxyChannelManager { userChannel.attr(PROXY_CHANNEL_WRITEABLE).set(proxyChannelWriteability); } - if (userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).get() - && userChannel.attr(PROXY_CHANNEL_WRITEABLE).get()) { + if (userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).get() && userChannel.attr(PROXY_CHANNEL_WRITEABLE).get()) { // 代理客户端与后端服务器连接状态均为可写时,用户连接状态为可读 userChannel.config().setOption(ChannelOption.AUTO_READ, true); diff --git a/proxy-server/src/main/java/org/fengfei/lanproxy/server/config/ProxyConfig.java b/proxy-server/src/main/java/org/fengfei/lanproxy/server/config/ProxyConfig.java index c378e36..26f0e12 100644 --- a/proxy-server/src/main/java/org/fengfei/lanproxy/server/config/ProxyConfig.java +++ b/proxy-server/src/main/java/org/fengfei/lanproxy/server/config/ProxyConfig.java @@ -11,6 +11,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.fengfei.lanproxy.common.Config; import org.fengfei.lanproxy.common.JsonUtil; @@ -93,9 +94,8 @@ public class ProxyConfig implements Serializable { this.configAdminUsername = Config.getInstance().getStringValue("config.admin.username"); this.configAdminPassword = Config.getInstance().getStringValue("config.admin.password"); - logger.info( - "config init serverBind {}, serverPort {}, configServerBind {}, configServerPort {}, configAdminUsername {}, configAdminPassword {}", - serverBind, serverPort, configServerBind, configServerPort, configAdminUsername, configAdminPassword); + logger.info("config init serverBind {}, serverPort {}, configServerBind {}, configServerPort {}, configAdminUsername {}, configAdminPassword {}", serverBind, serverPort, configServerBind, + configServerPort, configAdminUsername, configAdminPassword); update(null); } @@ -253,6 +253,15 @@ public class ProxyConfig implements Serializable { return clientInetPortMapping.get(clientKey); } + /** + * 获取所有的clientKey + * + * @return + */ + public Set getClientKeySet() { + return clientInetPortMapping.keySet(); + } + /** * 根据代理服务器端口获取后端服务器代理信息 * diff --git a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java index 723ba0a..5608289 100644 --- a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java +++ b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java @@ -26,26 +26,26 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler ports = ProxyConfig.getInstance().getClientInetPorts(clientKey); if (ports == null) { logger.info("error clientKey {}, {}", clientKey, ctx.channel()); - // ctx.channel().close(); + ctx.channel().close(); return; } logger.info("set port => channel, {} {}", clientKey, ports); - ProxyChannelManager.addProxyChannel(ports, ctx.channel()); + ProxyChannelManager.addProxyChannel(ports, clientKey, ctx.channel()); } @Override diff --git a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java index 828554b..988cc71 100755 --- a/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java +++ b/proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java @@ -61,14 +61,14 @@ public class UserChannelHandler extends SimpleChannelInboundHandler { ctx.channel().close(); } else { String userId = newUserId(); - + String lanInfo = ProxyConfig.getInstance().getLanInfo(sa.getPort()); // 用户连接到代理服务器时,设置用户连接不可读,等待代理后端服务器连接成功后再改变为可读状态 ProxyChannelManager.setUserChannelReadability(userChannel, false, proxyChannel.isWritable()); ProxyChannelManager.addUserChannel(proxyChannel, userId, userChannel); ProxyMessage proxyMessage = new ProxyMessage(); proxyMessage.setType(ProxyMessage.TYPE_CONNECT); proxyMessage.setUri(userId); - proxyMessage.setData(ProxyConfig.getInstance().getLanInfo(sa.getPort()).getBytes()); + proxyMessage.setData(lanInfo.getBytes()); proxyChannel.writeAndFlush(proxyMessage); } super.channelActive(ctx); diff --git a/proxy-server/src/main/resources/config.properties b/proxy-server/src/main/resources/config.properties index 7f3ac04..e557e5b 100644 --- a/proxy-server/src/main/resources/config.properties +++ b/proxy-server/src/main/resources/config.properties @@ -3,7 +3,7 @@ server.port=4900 server.ssl.enable=false server.ssl.bind=0.0.0.0 -server.ssl.port=8883 +server.ssl.port=4993 server.ssl.jksPath=test.jks server.ssl.keyStorePassword=123456 server.ssl.keyManagerPassword=123456 diff --git a/proxy-server/src/test/resources/config.properties b/proxy-server/src/test/resources/config.properties index 674ed47..7ff2c85 100644 --- a/proxy-server/src/test/resources/config.properties +++ b/proxy-server/src/test/resources/config.properties @@ -3,7 +3,7 @@ server.port=4900 server.ssl.enable=true server.ssl.bind=0.0.0.0 -server.ssl.port=8883 +server.ssl.port=4993 server.ssl.jksPath=test.jks server.ssl.keyStorePassword=123456 server.ssl.keyManagerPassword=123456