优化在线配置功能

master
fengfei 7 years ago
parent e51f9c5010
commit c6f01580e6
  1. 32
      proxy-client/src/main/java/org/fengfei/lanproxy/client/ProxyClientContainer.java
  2. 2
      proxy-client/src/main/resources/config.properties
  3. 4
      proxy-client/src/test/resources/config.properties
  4. 2
      proxy-client/src/test/resources/log4j.properties
  5. 51
      proxy-common/src/main/java/org/fengfei/lanproxy/common/ConcurrentHashMapExample.java
  6. 141
      proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java
  7. 15
      proxy-server/src/main/java/org/fengfei/lanproxy/server/config/ProxyConfig.java
  8. 4
      proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java
  9. 4
      proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java
  10. 2
      proxy-server/src/main/resources/config.properties
  11. 2
      proxy-server/src/test/resources/config.properties

@ -53,6 +53,8 @@ public class ProxyClientContainer implements Container, ChannelStatusListener {
private SSLContext sslContext;
private long sleepTimeMill = 1000;
public ProxyClientContainer() {
workerGroup = new NioEventLoopGroup();
realServerBootstrap = new Bootstrap();
@ -80,11 +82,9 @@ public class ProxyClientContainer implements Container, ChannelStatusListener {
ch.pipeline().addLast(createSslHandler(sslContext));
}
ch.pipeline().addLast(new ProxyMessageDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET,
LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP));
ch.pipeline().addLast(new ProxyMessageDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP));
ch.pipeline().addLast(new ProxyMessageEncoder());
ch.pipeline().addLast(
new IdleCheckHandler(IdleCheckHandler.READ_IDLE_TIME, IdleCheckHandler.WRITE_IDLE_TIME, 0));
ch.pipeline().addLast(new IdleCheckHandler(IdleCheckHandler.READ_IDLE_TIME, IdleCheckHandler.WRITE_IDLE_TIME, 0));
ch.pipeline().addLast(new ClientChannelHandler(realServerBootstrap, ProxyClientContainer.this));
}
});
@ -103,8 +103,7 @@ public class ProxyClientContainer implements Container, ChannelStatusListener {
private void connectProxyServer() {
bootstrap.connect(config.getStringValue("server.host"), config.getIntValue("server.port"))
.addListener(new ChannelFutureListener() {
bootstrap.connect(config.getStringValue("server.host"), config.getIntValue("server.port")).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -116,12 +115,13 @@ public class ProxyClientContainer implements Container, ChannelStatusListener {
proxyMessage.setType(ProxyMessage.TYPE_AUTH);
proxyMessage.setUri(config.getStringValue("client.key"));
future.channel().writeAndFlush(proxyMessage);
sleepTimeMill = 1000;
logger.info("connect proxy server success, {}", future.channel());
} else {
logger.warn("connect proxy server failed", future.cause());
// 连接失败,延时1秒发起重连
Thread.sleep(1000);
// 连接失败,发起重连
reconnectWait();
connectProxyServer();
}
}
@ -135,12 +135,22 @@ public class ProxyClientContainer implements Container, ChannelStatusListener {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
reconnectWait();
connectProxyServer();
}
private void reconnectWait() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
if (sleepTimeMill > 60000) {
sleepTimeMill = 1000;
}
connectProxyServer();
synchronized (this) {
sleepTimeMill = sleepTimeMill * 2;
wait(sleepTimeMill);
}
} catch (InterruptedException e) {
}
}
public static void main(String[] args) {

@ -5,5 +5,5 @@ ssl.keyStorePassword=123456
server.host=127.0.0.1
#default ssl port is 8883
#default ssl port is 4993
server.port=4900

@ -5,5 +5,5 @@ ssl.keyStorePassword=123456
server.host=127.0.0.1
#default ssl port is 8883, none ssl port is 4900
server.port=8883
#default ssl port is 4993
server.port=4993

@ -1,4 +1,4 @@
log4j.rootLogger=debug,stdout
log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

@ -1,51 +0,0 @@
package org.fengfei.lanproxy.common;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
// ConcurrentHashMap
Map<String, String> myMap = new ConcurrentHashMap<String, String>();
myMap.put("1", "1");
myMap.put("2", "1");
myMap.put("3", "1");
myMap.put("4", "1");
myMap.put("5", "1");
myMap.put("6", "1");
System.out.println("ConcurrentHashMap before iterator: " + myMap);
Iterator<String> it = myMap.keySet().iterator();
while (it.hasNext()) {
String key = it.next();
if (key.equals("3")) {
myMap.remove("3");
}
}
System.out.println("ConcurrentHashMap after iterator: " + myMap);
// HashMap
myMap = new HashMap<String, String>();
myMap.put("1", "1");
myMap.put("2", "1");
myMap.put("3", "1");
myMap.put("4", "1");
myMap.put("5", "1");
myMap.put("6", "1");
System.out.println("HashMap before iterator: " + myMap);
Iterator<String> it1 = myMap.keySet().iterator();
while (it1.hasNext()) {
String key = it1.next();
if (key.equals("3"))
myMap.put(key + "new", "new3");
}
System.out.println("HashMap after iterator: " + myMap);
}
}

@ -1,8 +1,17 @@
package org.fengfei.lanproxy.server;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.fengfei.lanproxy.server.config.ProxyConfig;
@ -10,10 +19,6 @@ import org.fengfei.lanproxy.server.config.ProxyConfig.ConfigChangedListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.util.AttributeKey;
/**
* 代理服务连接管理代理客户端连接+用户请求连接
*
@ -28,13 +33,15 @@ public class ProxyChannelManager {
private static final AttributeKey<String> USER_ID = AttributeKey.newInstance("user_id");
private static final AttributeKey<String> REQUEST_LAN_INFO = AttributeKey.newInstance("request_lan_info");
private static final AttributeKey<List<Integer>> CHANNEL_PORT = AttributeKey.newInstance("channel_port");
private static final AttributeKey<Boolean> PROXY_CHANNEL_WRITEABLE = AttributeKey
.newInstance("proxy_channel_writeable");
private static final AttributeKey<String> CHANNEL_CLIENT_KEY = AttributeKey.newInstance("channel_client_key");
private static final AttributeKey<Boolean> PROXY_CHANNEL_WRITEABLE = AttributeKey.newInstance("proxy_channel_writeable");
private static final AttributeKey<Boolean> REAL_BACKEND_SERVER_CHANNEL_WRITEABLE = AttributeKey
.newInstance("real_backend_server_channel_writeable");
private static final AttributeKey<Boolean> REAL_BACKEND_SERVER_CHANNEL_WRITEABLE = AttributeKey.newInstance("real_backend_server_channel_writeable");
private static Map<Integer, Channel> proxyChannels = new ConcurrentHashMap<Integer, Channel>();
@ -45,12 +52,97 @@ public class ProxyChannelManager {
* 代理配置发生变化时回调
*/
@Override
public void onChanged() {
Iterator<Integer> ite = proxyChannels.keySet().iterator();
public synchronized void onChanged() {
Iterator<Entry<Integer, Channel>> ite = proxyChannels.entrySet().iterator();
Set<Channel> proxyChannelSet = new HashSet<Channel>();
while (ite.hasNext()) {
Channel proxyChannel = proxyChannels.get(ite.next());
if (proxyChannel != null && proxyChannel.isActive()) {
Channel proxyChannel = ite.next().getValue();
// 因为不同的外网端口可能映射了相同的内部代理连接,相同代理连接处理一次即可
if (proxyChannelSet.contains(proxyChannel)) {
continue;
}
proxyChannelSet.add(proxyChannel);
String clientKey = proxyChannel.attr(CHANNEL_CLIENT_KEY).get();
// 去除已经去掉的clientKey配置
Set<String> clientKeySet = ProxyConfig.getInstance().getClientKeySet();
if (!clientKeySet.contains(clientKey)) {
removeProxyChannel(proxyChannel);
continue;
}
if (proxyChannel.isActive()) {
List<Integer> inetPorts = new ArrayList<Integer>(ProxyConfig.getInstance().getClientInetPorts(clientKey));
Set<Integer> inetPortSet = new HashSet<Integer>(inetPorts);
List<Integer> channelInetPorts = new ArrayList<Integer>(proxyChannel.attr(CHANNEL_PORT).get());
synchronized (proxyChannels) {
// 移除旧的连接映射关系
for (int chanelInetPort : channelInetPorts) {
Channel channel = proxyChannels.get(chanelInetPort);
if (proxyChannel == null) {
continue;
}
// 判断是否是同一个连接对象,有可能之前已经更换成其他client的连接了
if (proxyChannel == channel) {
if (!inetPortSet.contains(chanelInetPort)) {
// 移除新配置中不包含的端口
proxyChannels.remove(chanelInetPort);
proxyChannel.attr(CHANNEL_PORT).get().remove(new Integer(chanelInetPort));
} else {
// 端口已经在改proxyChannel中使用了
inetPorts.remove(new Integer(chanelInetPort));
}
}
}
// 将新配置中增加的外网端口写入到映射配置中
for (int inetPort : inetPorts) {
proxyChannels.put(inetPort, proxyChannel);
proxyChannel.attr(CHANNEL_PORT).get().add(inetPort);
}
checkAndClearUserChannels(proxyChannel);
}
}
}
ite = proxyChannels.entrySet().iterator();
while (ite.hasNext()) {
Entry<Integer, Channel> entry = ite.next();
Channel proxyChannel = entry.getValue();
logger.info("proxyChannel config, {}, {}, {} ,{}", entry.getKey(), proxyChannel, getUserChannels(proxyChannel).size(), proxyChannel.attr(CHANNEL_PORT).get());
}
}
/**
* 检测连接配置是否与当前配置一致不一致则关闭
*
* @param proxyChannel
*/
private void checkAndClearUserChannels(Channel proxyChannel) {
Map<String, Channel> userChannels = getUserChannels(proxyChannel);
Iterator<Entry<String, Channel>> userChannelIte = userChannels.entrySet().iterator();
while (userChannelIte.hasNext()) {
Entry<String, Channel> entry = userChannelIte.next();
Channel userChannel = entry.getValue();
String requestLanInfo = getUserChannelRequestLanInfo(userChannel);
InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress();
String lanInfo = ProxyConfig.getInstance().getLanInfo(sa.getPort());
// 判断当前配置中对应外网端口的lan信息是否与正在运行的连接中的lan信息是否一致
if (lanInfo == null || !lanInfo.equals(requestLanInfo)) {
userChannel.close();
// ConcurrentHashMap不会报ConcurrentModificationException异常
userChannels.remove(entry.getKey());
}
}
}
@ -63,7 +155,7 @@ public class ProxyChannelManager {
* @param ports
* @param channel
*/
public static void addProxyChannel(List<Integer> ports, Channel channel) {
public static void addProxyChannel(List<Integer> ports, String clientKey, Channel channel) {
if (ports == null) {
throw new IllegalArgumentException("port can not be null");
@ -80,6 +172,7 @@ public class ProxyChannelManager {
}
channel.attr(CHANNEL_PORT).set(ports);
channel.attr(CHANNEL_CLIENT_KEY).set(clientKey);
channel.attr(USER_CHANNELS).set(new ConcurrentHashMap<String, Channel>());
}
@ -135,7 +228,10 @@ public class ProxyChannelManager {
* @param userChannel
*/
public static void addUserChannel(Channel proxyChannel, String userId, Channel userChannel) {
InetSocketAddress sa = (InetSocketAddress) userChannel.localAddress();
String lanInfo = ProxyConfig.getInstance().getLanInfo(sa.getPort());
userChannel.attr(USER_ID).set(userId);
userChannel.attr(REQUEST_LAN_INFO).set(lanInfo);
proxyChannel.attr(USER_CHANNELS).get().put(userId, userChannel);
}
@ -173,6 +269,16 @@ public class ProxyChannelManager {
return userChannel.attr(USER_ID).get();
}
/**
* 获取用户请求的内网IP端口信息
*
* @param userChannel
* @return
*/
public static String getUserChannelRequestLanInfo(Channel userChannel) {
return userChannel.attr(REQUEST_LAN_INFO).get();
}
/**
* 获取代理客户端连接绑定的所有用户连接
*
@ -190,10 +296,8 @@ public class ProxyChannelManager {
* @param client
* @param proxy
*/
public static void setUserChannelReadability(Channel userChannel, Boolean realBackendServerChannelWriteability,
Boolean proxyChannelWriteability) {
logger.info("update user channel readability, {} {} {}", userChannel, realBackendServerChannelWriteability,
proxyChannelWriteability);
public static void setUserChannelReadability(Channel userChannel, Boolean realBackendServerChannelWriteability, Boolean proxyChannelWriteability) {
logger.info("update user channel readability, {} {} {}", userChannel, realBackendServerChannelWriteability, proxyChannelWriteability);
synchronized (userChannel) {
if (realBackendServerChannelWriteability != null) {
userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).set(realBackendServerChannelWriteability);
@ -203,8 +307,7 @@ public class ProxyChannelManager {
userChannel.attr(PROXY_CHANNEL_WRITEABLE).set(proxyChannelWriteability);
}
if (userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).get()
&& userChannel.attr(PROXY_CHANNEL_WRITEABLE).get()) {
if (userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).get() && userChannel.attr(PROXY_CHANNEL_WRITEABLE).get()) {
// 代理客户端与后端服务器连接状态均为可写时,用户连接状态为可读
userChannel.config().setOption(ChannelOption.AUTO_READ, true);

@ -11,6 +11,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.fengfei.lanproxy.common.Config;
import org.fengfei.lanproxy.common.JsonUtil;
@ -93,9 +94,8 @@ public class ProxyConfig implements Serializable {
this.configAdminUsername = Config.getInstance().getStringValue("config.admin.username");
this.configAdminPassword = Config.getInstance().getStringValue("config.admin.password");
logger.info(
"config init serverBind {}, serverPort {}, configServerBind {}, configServerPort {}, configAdminUsername {}, configAdminPassword {}",
serverBind, serverPort, configServerBind, configServerPort, configAdminUsername, configAdminPassword);
logger.info("config init serverBind {}, serverPort {}, configServerBind {}, configServerPort {}, configAdminUsername {}, configAdminPassword {}", serverBind, serverPort, configServerBind,
configServerPort, configAdminUsername, configAdminPassword);
update(null);
}
@ -253,6 +253,15 @@ public class ProxyConfig implements Serializable {
return clientInetPortMapping.get(clientKey);
}
/**
* 获取所有的clientKey
*
* @return
*/
public Set<String> getClientKeySet() {
return clientInetPortMapping.keySet();
}
/**
* 根据代理服务器端口获取后端服务器代理信息
*

@ -101,12 +101,12 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<ProxyMessa
List<Integer> ports = ProxyConfig.getInstance().getClientInetPorts(clientKey);
if (ports == null) {
logger.info("error clientKey {}, {}", clientKey, ctx.channel());
// ctx.channel().close();
ctx.channel().close();
return;
}
logger.info("set port => channel, {} {}", clientKey, ports);
ProxyChannelManager.addProxyChannel(ports, ctx.channel());
ProxyChannelManager.addProxyChannel(ports, clientKey, ctx.channel());
}
@Override

@ -61,14 +61,14 @@ public class UserChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {
ctx.channel().close();
} else {
String userId = newUserId();
String lanInfo = ProxyConfig.getInstance().getLanInfo(sa.getPort());
// 用户连接到代理服务器时,设置用户连接不可读,等待代理后端服务器连接成功后再改变为可读状态
ProxyChannelManager.setUserChannelReadability(userChannel, false, proxyChannel.isWritable());
ProxyChannelManager.addUserChannel(proxyChannel, userId, userChannel);
ProxyMessage proxyMessage = new ProxyMessage();
proxyMessage.setType(ProxyMessage.TYPE_CONNECT);
proxyMessage.setUri(userId);
proxyMessage.setData(ProxyConfig.getInstance().getLanInfo(sa.getPort()).getBytes());
proxyMessage.setData(lanInfo.getBytes());
proxyChannel.writeAndFlush(proxyMessage);
}
super.channelActive(ctx);

@ -3,7 +3,7 @@ server.port=4900
server.ssl.enable=false
server.ssl.bind=0.0.0.0
server.ssl.port=8883
server.ssl.port=4993
server.ssl.jksPath=test.jks
server.ssl.keyStorePassword=123456
server.ssl.keyManagerPassword=123456

@ -3,7 +3,7 @@ server.port=4900
server.ssl.enable=true
server.ssl.bind=0.0.0.0
server.ssl.port=8883
server.ssl.port=4993
server.ssl.jksPath=test.jks
server.ssl.keyStorePassword=123456
server.ssl.keyManagerPassword=123456

Loading…
Cancel
Save