实现优化

master
fengfei 7 years ago
parent 661d345476
commit 1bbec8f492
  1. 1
      README.md
  2. 32
      proxy-client/src/main/java/org/fengfei/lanproxy/client/ClientChannelMannager.java
  3. 19
      proxy-client/src/main/java/org/fengfei/lanproxy/client/handlers/ClientChannelHandler.java
  4. 12
      proxy-client/src/main/java/org/fengfei/lanproxy/client/handlers/RealServerChannelHandler.java
  5. 2
      proxy-client/src/main/resources/startup.sh
  6. 2
      proxy-client/src/main/resources/stop.sh
  7. 33
      proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java
  8. 19
      proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/ServerChannelHandler.java
  9. 15
      proxy-server/src/main/java/org/fengfei/lanproxy/server/handlers/UserChannelHandler.java
  10. 2
      proxy-server/src/main/resources/startup.sh
  11. 2
      proxy-server/src/main/resources/stop.sh
  12. 2
      proxy-server/src/test/resources/log4j.properties

@ -8,6 +8,7 @@ lanproxy是一个将局域网个人电脑、服务器代理到公网的内网穿
### 相关地址
- 主页 https://lanproxy.thingsglobal.org
- lanproxy-go-client https://github.com/ffay/lanproxy-go-client
- 发布包下载地址 https://github.com/ffay/lanproxy/releases
### 实现方案

@ -16,7 +16,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.util.AttributeKey;
/**
@ -109,37 +108,6 @@ public class ClientChannelMannager {
return realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get() && realServerChannel.attr(USER_CHANNEL_WRITEABLE).get();
}
public static void setRealServerChannelReadability(Channel realServerChannel, Boolean client, Boolean user) {
logger.debug("update real server channel readability, {} {} {}", realServerChannel, client, user);
if (realServerChannel == null) {
return;
}
if (client != null) {
realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).set(client);
}
if (user != null) {
realServerChannel.attr(USER_CHANNEL_WRITEABLE).set(user);
}
if (realServerChannel.attr(CLIENT_CHANNEL_WRITEABLE).get() && realServerChannel.attr(USER_CHANNEL_WRITEABLE).get()) {
realServerChannel.config().setOption(ChannelOption.AUTO_READ, true);
} else {
realServerChannel.config().setOption(ChannelOption.AUTO_READ, false);
}
}
// public static void notifyChannelWritabilityChanged(Channel channel) {
// logger.debug("channel writability changed, {}", channel.isWritable());
// Iterator<Entry<String, Channel>> entryIte =
// realServerChannels.entrySet().iterator();
// while (entryIte.hasNext()) {
// setRealServerChannelReadability(entryIte.next().getValue(),
// channel.isWritable(), null);
// }
// }
public static void clearRealServerChannels() {
logger.warn("channel closed, clear real server channels");

@ -16,6 +16,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
/**
@ -52,23 +53,11 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<ProxyMessa
case ProxyMessage.P_TYPE_TRANSFER:
handleTransferMessage(ctx, proxyMessage);
break;
case ProxyMessage.C_TYPE_WRITE_CONTROL:
handleWriteControlMessage(ctx, proxyMessage);
break;
default:
break;
}
}
private void handleWriteControlMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) {
String userId = proxyMessage.getUri();
Channel realServerChannel = ClientChannelMannager.getRealServerChannel(userId);
if (realServerChannel != null) {
boolean writeable = proxyMessage.getData()[0] == 0x01 ? true : false;
ClientChannelMannager.setRealServerChannelReadability(realServerChannel, null, writeable);
}
}
private void handleTransferMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) {
Channel realServerChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get();
if (realServerChannel != null) {
@ -105,7 +94,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<ProxyMessa
final Channel realServerChannel = future.channel();
logger.debug("connect realserver success, {}", realServerChannel);
ClientChannelMannager.setRealServerChannelReadability(realServerChannel, false, true);
realServerChannel.config().setOption(ChannelOption.AUTO_READ, false);
// 获取连接
ClientChannelMannager.borrowProxyChanel(proxyBootstrap, new ProxyChannelBorrowListener() {
@ -122,7 +111,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<ProxyMessa
proxyMessage.setUri(userId + "@" + Config.getInstance().getStringValue("client.key"));
channel.writeAndFlush(proxyMessage);
ClientChannelMannager.setRealServerChannelReadability(realServerChannel, true, true);
realServerChannel.config().setOption(ChannelOption.AUTO_READ, true);
ClientChannelMannager.addRealServerChannel(userId, realServerChannel);
ClientChannelMannager.setRealServerChannelUserId(realServerChannel, userId);
}
@ -150,7 +139,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<ProxyMessa
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel realServerChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get();
if (realServerChannel != null) {
ClientChannelMannager.setRealServerChannelReadability(realServerChannel, ctx.channel().isWritable(), null);
realServerChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
}
super.channelWritabilityChanged(ctx);

@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
/**
@ -63,14 +64,9 @@ public class RealServerChannelHandler extends SimpleChannelInboundHandler<ByteBu
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel realServerChannel = ctx.channel();
String userId = ClientChannelMannager.getRealServerChannelUserId(realServerChannel);
Channel channel = ClientChannelMannager.getCmdChannel();
if (channel != null) {
ProxyMessage proxyMessage = new ProxyMessage();
proxyMessage.setType(ProxyMessage.C_TYPE_WRITE_CONTROL);
proxyMessage.setUri(userId);
proxyMessage.setData(realServerChannel.isWritable() ? new byte[] { 0x01 } : new byte[] { 0x00 });
channel.writeAndFlush(proxyMessage);
Channel proxyChannel = realServerChannel.attr(Constants.NEXT_CHANNEL).get();
if (proxyChannel != null) {
proxyChannel.config().setOption(ChannelOption.AUTO_READ, realServerChannel.isWritable());
}
super.channelWritabilityChanged(ctx);

@ -7,7 +7,7 @@ LOGS_DIR=$DEPLOY_DIR/logs
APP_MAINCLASS=org.fengfei.lanproxy.client.ProxyClientContainer
PIDS=`ps -ef | grep java | grep "$CONF_DIR" |awk '{print $2}'`
PIDS=`ps -ef | grep -v grep | grep "$CONF_DIR" |awk '{print $2}'`
if [ -n "$PIDS" ]; then
echo "ERROR: already started!"
echo "PID: $PIDS"

@ -9,7 +9,7 @@ if [ ! -d $LOGS_DIR ]; then
fi
STDOUT_FILE=$LOGS_DIR/stdout.log
PID=`ps -ef | grep java | grep "$DEPLOY_DIR/conf" | awk '{print $2}'`
PID=`ps -ef | grep -v grep | grep "$DEPLOY_DIR/conf" | awk '{print $2}'`
echo "PID: $PID"
if [ -z "$PID" ]; then
echo "ERROR: The proxy client does not started!"

@ -16,7 +16,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.util.AttributeKey;
/**
@ -37,10 +36,6 @@ public class ProxyChannelManager {
private static final AttributeKey<String> CHANNEL_CLIENT_KEY = AttributeKey.newInstance("channel_client_key");
private static final AttributeKey<Boolean> PROXY_CHANNEL_WRITEABLE = AttributeKey.newInstance("proxy_channel_writeable");
private static final AttributeKey<Boolean> REAL_BACKEND_SERVER_CHANNEL_WRITEABLE = AttributeKey.newInstance("real_backend_server_channel_writeable");
private static Map<Integer, Channel> portCmdChannelMapping = new ConcurrentHashMap<Integer, Channel>();
private static Map<String, Channel> cmdChannels = new ConcurrentHashMap<String, Channel>();
@ -293,32 +288,4 @@ public class ProxyChannelManager {
return cmdChannel.attr(USER_CHANNELS).get();
}
/**
* 更新用户连接是否可写状态
*
* @param userChannel
* @param client
* @param proxy
*/
public static void setUserChannelReadability(Channel userChannel, Boolean realBackendServerChannelWriteability, Boolean proxyChannelWriteability) {
logger.debug("update user channel readability, {} {} {}", userChannel, realBackendServerChannelWriteability, proxyChannelWriteability);
synchronized (userChannel) {
if (realBackendServerChannelWriteability != null) {
userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).set(realBackendServerChannelWriteability);
}
if (proxyChannelWriteability != null) {
userChannel.attr(PROXY_CHANNEL_WRITEABLE).set(proxyChannelWriteability);
}
if (userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).get() && userChannel.attr(PROXY_CHANNEL_WRITEABLE).get()) {
// 代理客户端与后端服务器连接状态均为可写时,用户连接状态为可读
userChannel.config().setOption(ChannelOption.AUTO_READ, true);
} else {
userChannel.config().setOption(ChannelOption.AUTO_READ, false);
}
}
}
}

@ -14,6 +14,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
/**
@ -44,25 +45,11 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<ProxyMessa
case ProxyMessage.P_TYPE_TRANSFER:
handleTransferMessage(ctx, proxyMessage);
break;
case ProxyMessage.C_TYPE_WRITE_CONTROL:
handleWriteControlMessage(ctx, proxyMessage);
break;
default:
break;
}
}
private void handleWriteControlMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) {
String userId = proxyMessage.getUri();
Channel userChannel = ProxyChannelManager.getUserChannel(ctx.channel(), userId);
if (userChannel != null) {
// 同步代理客户端与后端服务器的连接可写状态
boolean writeable = proxyMessage.getData()[0] == 0x01 ? true : false;
ProxyChannelManager.setUserChannelReadability(userChannel, writeable, null);
}
}
private void handleTransferMessage(ChannelHandlerContext ctx, ProxyMessage proxyMessage) {
Channel userChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get();
if (userChannel != null) {
@ -131,7 +118,7 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<ProxyMessa
ctx.channel().attr(Constants.NEXT_CHANNEL).set(userChannel);
userChannel.attr(Constants.NEXT_CHANNEL).set(ctx.channel());
// 代理客户端与后端服务器连接成功,修改用户连接为可读状态
ProxyChannelManager.setUserChannelReadability(userChannel, true, ctx.channel().isWritable());
userChannel.config().setOption(ChannelOption.AUTO_READ, true);
}
}
@ -167,7 +154,7 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<ProxyMessa
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel userChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get();
if (userChannel != null) {
ProxyChannelManager.setUserChannelReadability(userChannel, null, ctx.channel().isWritable());
userChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
}
super.channelWritabilityChanged(ctx);

@ -11,6 +11,7 @@ import org.fengfei.lanproxy.server.config.ProxyConfig;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
/**
@ -63,7 +64,7 @@ public class UserChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {
String userId = newUserId();
String lanInfo = ProxyConfig.getInstance().getLanInfo(sa.getPort());
// 用户连接到代理服务器时,设置用户连接不可读,等待代理后端服务器连接成功后再改变为可读状态
ProxyChannelManager.setUserChannelReadability(userChannel, false, false);
userChannel.config().setOption(ChannelOption.AUTO_READ, false);
ProxyChannelManager.addUserChannelToCmdChannel(cmdChannel, userId, userChannel);
ProxyMessage proxyMessage = new ProxyMessage();
proxyMessage.setType(ProxyMessage.TYPE_CONNECT);
@ -120,14 +121,10 @@ public class UserChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 该端口还没有代理客户端
ctx.channel().close();
} else {
// 通知代理客户端,用户连接可写状态
String userId = ProxyChannelManager.getUserChannelUserId(userChannel);
ProxyMessage proxyMessage = new ProxyMessage();
proxyMessage.setType(ProxyMessage.C_TYPE_WRITE_CONTROL);
proxyMessage.setUri(userId);
proxyMessage.setData(userChannel.isWritable() ? new byte[] { 0x01 } : new byte[] { 0x00 });
cmdChannel.writeAndFlush(proxyMessage);
Channel proxyChannel = userChannel.attr(Constants.NEXT_CHANNEL).get();
if (proxyChannel != null) {
proxyChannel.config().setOption(ChannelOption.AUTO_READ, userChannel.isWritable());
}
}
super.channelWritabilityChanged(ctx);

@ -7,7 +7,7 @@ LOGS_DIR=$DEPLOY_DIR/logs
APP_MAINCLASS=org.fengfei.lanproxy.server.ProxyServerContainer
PIDS=`ps -ef | grep java | grep "$CONF_DIR" |awk '{print $2}'`
PIDS=`ps -ef | grep -v grep | grep "$CONF_DIR" |awk '{print $2}'`
if [ -n "$PIDS" ]; then
echo "ERROR: already started!"
echo "PID: $PIDS"

@ -9,7 +9,7 @@ if [ ! -d $LOGS_DIR ]; then
fi
STDOUT_FILE=$LOGS_DIR/stdout.log
PID=`ps -ef | grep java | grep "$DEPLOY_DIR/conf" | awk '{print $2}'`
PID=`ps -ef | grep -v grep | grep "$DEPLOY_DIR/conf" | awk '{print $2}'`
echo "PID: $PID"
if [ -z "$PID" ]; then
echo "ERROR: The proxy server does not started!"

@ -1,4 +1,4 @@
log4j.rootLogger=debug,stdout
log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

Loading…
Cancel
Save