master
fengfei 7 years ago
parent 80bb7c8d5e
commit aea4b428c2
  1. 34
      proxy-server/src/main/java/org/fengfei/lanproxy/server/ProxyChannelManager.java

@ -39,11 +39,9 @@ public class ProxyChannelManager {
private static final AttributeKey<String> CHANNEL_CLIENT_KEY = AttributeKey.newInstance("channel_client_key"); private static final AttributeKey<String> CHANNEL_CLIENT_KEY = AttributeKey.newInstance("channel_client_key");
private static final AttributeKey<Boolean> PROXY_CHANNEL_WRITEABLE = AttributeKey private static final AttributeKey<Boolean> PROXY_CHANNEL_WRITEABLE = AttributeKey.newInstance("proxy_channel_writeable");
.newInstance("proxy_channel_writeable");
private static final AttributeKey<Boolean> REAL_BACKEND_SERVER_CHANNEL_WRITEABLE = AttributeKey private static final AttributeKey<Boolean> REAL_BACKEND_SERVER_CHANNEL_WRITEABLE = AttributeKey.newInstance("real_backend_server_channel_writeable");
.newInstance("real_backend_server_channel_writeable");
private static Map<Integer, Channel> portChannelMapping = new ConcurrentHashMap<Integer, Channel>(); private static Map<Integer, Channel> portChannelMapping = new ConcurrentHashMap<Integer, Channel>();
@ -58,17 +56,8 @@ public class ProxyChannelManager {
@Override @Override
public synchronized void onChanged() { public synchronized void onChanged() {
Iterator<Entry<String, Channel>> ite = proxyChannels.entrySet().iterator(); Iterator<Entry<String, Channel>> ite = proxyChannels.entrySet().iterator();
Set<Channel> proxyChannelSet = new HashSet<Channel>();
while (ite.hasNext()) { while (ite.hasNext()) {
Channel proxyChannel = ite.next().getValue(); Channel proxyChannel = ite.next().getValue();
// 因为不同的外网端口可能映射了相同的内部代理连接,相同代理连接处理一次即可
if (proxyChannelSet.contains(proxyChannel)) {
continue;
}
proxyChannelSet.add(proxyChannel);
String clientKey = proxyChannel.attr(CHANNEL_CLIENT_KEY).get(); String clientKey = proxyChannel.attr(CHANNEL_CLIENT_KEY).get();
// 去除已经去掉的clientKey配置 // 去除已经去掉的clientKey配置
@ -79,8 +68,7 @@ public class ProxyChannelManager {
} }
if (proxyChannel.isActive()) { if (proxyChannel.isActive()) {
List<Integer> inetPorts = new ArrayList<Integer>( List<Integer> inetPorts = new ArrayList<Integer>(ProxyConfig.getInstance().getClientInetPorts(clientKey));
ProxyConfig.getInstance().getClientInetPorts(clientKey));
Set<Integer> inetPortSet = new HashSet<Integer>(inetPorts); Set<Integer> inetPortSet = new HashSet<Integer>(inetPorts);
List<Integer> channelInetPorts = new ArrayList<Integer>(proxyChannel.attr(CHANNEL_PORT).get()); List<Integer> channelInetPorts = new ArrayList<Integer>(proxyChannel.attr(CHANNEL_PORT).get());
@ -89,7 +77,7 @@ public class ProxyChannelManager {
// 移除旧的连接映射关系 // 移除旧的连接映射关系
for (int chanelInetPort : channelInetPorts) { for (int chanelInetPort : channelInetPorts) {
Channel channel = portChannelMapping.get(chanelInetPort); Channel channel = portChannelMapping.get(chanelInetPort);
if (proxyChannel == null) { if (channel == null) {
continue; continue;
} }
@ -123,8 +111,7 @@ public class ProxyChannelManager {
while (ite.hasNext()) { while (ite.hasNext()) {
Entry<String, Channel> entry = ite.next(); Entry<String, Channel> entry = ite.next();
Channel proxyChannel = entry.getValue(); Channel proxyChannel = entry.getValue();
logger.info("proxyChannel config, {}, {}, {} ,{}", entry.getKey(), proxyChannel, logger.info("proxyChannel config, {}, {}, {} ,{}", entry.getKey(), proxyChannel, getUserChannels(proxyChannel).size(), proxyChannel.attr(CHANNEL_PORT).get());
getUserChannels(proxyChannel).size(), proxyChannel.attr(CHANNEL_PORT).get());
} }
} }
@ -170,8 +157,6 @@ public class ProxyChannelManager {
// 客户端(proxy-client)相对较少,这里同步的比较重 // 客户端(proxy-client)相对较少,这里同步的比较重
// 保证服务器对外端口与客户端到服务器的连接关系在临界情况时调用removeChannel(Channel channel)时不出问题 // 保证服务器对外端口与客户端到服务器的连接关系在临界情况时调用removeChannel(Channel channel)时不出问题
synchronized (portChannelMapping) { synchronized (portChannelMapping) {
//
for (int port : ports) { for (int port : ports) {
portChannelMapping.put(port, channel); portChannelMapping.put(port, channel);
} }
@ -309,10 +294,8 @@ public class ProxyChannelManager {
* @param client * @param client
* @param proxy * @param proxy
*/ */
public static void setUserChannelReadability(Channel userChannel, Boolean realBackendServerChannelWriteability, public static void setUserChannelReadability(Channel userChannel, Boolean realBackendServerChannelWriteability, Boolean proxyChannelWriteability) {
Boolean proxyChannelWriteability) { logger.info("update user channel readability, {} {} {}", userChannel, realBackendServerChannelWriteability, proxyChannelWriteability);
logger.info("update user channel readability, {} {} {}", userChannel, realBackendServerChannelWriteability,
proxyChannelWriteability);
synchronized (userChannel) { synchronized (userChannel) {
if (realBackendServerChannelWriteability != null) { if (realBackendServerChannelWriteability != null) {
userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).set(realBackendServerChannelWriteability); userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).set(realBackendServerChannelWriteability);
@ -322,8 +305,7 @@ public class ProxyChannelManager {
userChannel.attr(PROXY_CHANNEL_WRITEABLE).set(proxyChannelWriteability); userChannel.attr(PROXY_CHANNEL_WRITEABLE).set(proxyChannelWriteability);
} }
if (userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).get() if (userChannel.attr(REAL_BACKEND_SERVER_CHANNEL_WRITEABLE).get() && userChannel.attr(PROXY_CHANNEL_WRITEABLE).get()) {
&& userChannel.attr(PROXY_CHANNEL_WRITEABLE).get()) {
// 代理客户端与后端服务器连接状态均为可写时,用户连接状态为可读 // 代理客户端与后端服务器连接状态均为可写时,用户连接状态为可读
userChannel.config().setOption(ChannelOption.AUTO_READ, true); userChannel.config().setOption(ChannelOption.AUTO_READ, true);

Loading…
Cancel
Save