Compare commits

..

1 Commits
master ... dev1

Author SHA1 Message Date
67d9fdc492 优化 2025-05-31 04:32:13 +08:00
5 changed files with 840 additions and 508 deletions

317
connector.py Normal file
View File

@ -0,0 +1,317 @@
import socket
import json
import threading
from concurrent.futures import ThreadPoolExecutor
import time
import uuid
import logging
# 配置日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ServiceConnector:
def __init__(self, coordinator_addr, service_name, local_port):
"""
初始化服务连接器
:param coordinator_addr: 协调服务器地址 (IP, port)
:param service_name: 请求的服务名称
:param local_port: 本地监听端口
"""
self.coordinator_addr = coordinator_addr
self.service_name = service_name
self.local_port = local_port
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
# 创建UDP套接字用于协调通信
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('0.0.0.0', 0))
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
# 创建TCP套接字用于本地监听
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_sock.bind(('127.0.0.1', local_port))
self.tcp_sock.listen(5)
logger.info(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
# 注册线程池
self.thread_pool = ThreadPoolExecutor(max_workers=10)
# 存储活动连接
self.active_connections = {}
self.provider_addr = None
self.provider_id = None
self.internal_port = None
self.running = True
def request_service(self):
"""
向协调服务器请求服务
:return: 请求是否成功
"""
message = {
'action': 'request',
'service_name': self.service_name,
'client_id': self.client_id
}
logger.info(f"向协调服务器 {self.coordinator_addr} 请求服务 '{self.service_name}'")
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
# 等待响应
try:
data, _ = self.udp_sock.recvfrom(4096)
response = json.loads(data.decode())
if response['status'] == 'success':
self.provider_addr = tuple(response['provider_addr'])
self.internal_port = response['internal_port']
self.provider_id = response['provider_id']
logger.info(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}")
return True
else:
logger.error(f"服务请求失败: {response['message']}")
return False
except Exception as e:
logger.error(f"请求服务时发生错误: {str(e)}")
return False
def punch_hole(self):
"""
执行UDP打洞
:return: 打洞是否成功
"""
if not self.provider_addr:
return False
# 请求协服务器发起打洞
message = {
'action': 'punch_request',
'client_id': self.client_id,
'provider_id': self.provider_id
}
logger.info(f"向协调服务器 {self.coordinator_addr} 请求打洞到 {self.provider_addr}")
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
# 等待协调服务器响应
try:
data, _ = self.udp_sock.recvfrom(4096)
response = json.loads(data.decode())
if response['status'] != 'success':
logger.error(f"打洞请求失败: {response['message']}")
return False
# 向服务提供者发送打洞包
logger.info(f"尝试打洞到 {self.provider_addr}...")
for _ in range(10):
self.udp_sock.sendto(json.dumps({
'action': 'punch',
'client_id': self.client_id
}).encode(), self.provider_addr)
time.sleep(0.2)
# 检查连通性
self.udp_sock.settimeout(10.0)
try:
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
data, addr = self.udp_sock.recvfrom(4096)
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
logger.info("打洞成功! 已建立UDP连接")
return True
else:
logger.error(f"错误的打洞响应{data}")
return False
except socket.timeout:
logger.error("打洞失败: 未收到响应")
return False
finally:
self.udp_sock.settimeout(None)
def handle_punch_response(self, message, addr):
"""
处理打洞响应
:param message: 打洞响应消息
:param addr: 服务提供者地址
"""
logger.debug(f"收到来自 {addr} 的打洞响应")
def handle_stop_conn(self, message, addr):
"""
处理停止连接请求
:param message: 停止连接请求消息
:param addr: 服务提供者地址
"""
conn_id = message['conn_id']
if conn_id in self.active_connections:
self.active_connections[conn_id].close()
self.active_connections.pop(conn_id, None)
logger.debug(f"关闭本地连接 {conn_id}")
def tcp_listener(self):
"""
监听本地TCP连接
"""
while self.running:
try:
client_sock, client_addr = self.tcp_sock.accept()
logger.debug(f"新的本地连接来自 {client_addr}")
# 为每个连接生成唯一ID
conn_id = str(uuid.uuid4())
# 存储连接
self.active_connections[conn_id] = client_sock
# 请求服务提供者建立连接
self.udp_sock.sendto(json.dumps({
'action': 'connect',
'client_id': self.client_id,
'conn_id': conn_id,
'service_name': self.service_name
}).encode(), self.provider_addr)
time.sleep(0.5)
# 启动数据转发线程
threading.Thread(
target=self.forward_data,
args=(conn_id, client_sock),
daemon=True
).start()
except Exception as e:
logger.error(f"处理本地连接时发生错误: {str(e)}")
def handel_punch(self, message, addr):
"""
处理UDP打洞请求
:param message: 打洞请求消息
:param addr: 服务提供者地址
"""
logger.debug(f"收到来自 {addr} 的UDP打洞请求")
self.udp_sock.sendto(json.dumps({
'action': 'punch_response',
'client_id': self.client_id,
'provider_id': self.provider_id
}).encode(), addr)
def handle_data(self, message, addr):
"""
处理数据消息
:param message: 数据消息
:param addr: 服务提供者地址
"""
conn_id = message['conn_id']
data = bytes.fromhex(message['data'])
if conn_id in self.active_connections:
# 转发数据到本地客户端
logger.debug(f"收到来自 {addr} 的数据,转发到本地连接 {conn_id}")
self.active_connections[conn_id].sendall(data)
else:
self.udp_sock.sendto(json.dumps({'action': 'stop_conn', 'conn_id': conn_id}).encode(), addr)
logger.debug(f"收到来自 {addr} 的数据,但未找到对应的本地连接")
def forward_data(self, conn_id, client_sock):
"""
转发本地TCP数据到UDP隧道
:param conn_id: 连接ID
:param client_sock: 本地客户端套接字
"""
try:
while True:
# 从本地客户端读取数据
data = client_sock.recv(4096)
if not data:
break
# 通过UDP发送给服务提供者
logger.debug(f"发送数据到服务提供者: {self.provider_addr}")
self.udp_sock.sendto(json.dumps({
'action': 'data',
'conn_id': conn_id,
'data': data.hex() # 十六进制编码二进制数据
}).encode(), self.provider_addr)
except Exception as e:
logger.error(f"转发数据失败: {str(e)}")
finally:
client_sock.close()
if conn_id in self.active_connections:
del self.active_connections[conn_id]
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': conn_id
}).encode(), self.provider_addr)
logger.debug(f"连接 {conn_id} 已关闭")
def udp_listener(self):
"""
监听UDP消息并处理
"""
while self.running:
try:
data, addr = self.udp_sock.recvfrom(65535)
logger.debug(f"收到来自 {addr} 的消息: {data}")
message = json.loads(data.decode())
# 使用字典映射处理不同消息类型
action_handlers = {
'punch_response': self.handle_punch_response,
'data': self.handle_data,
'stop_conn': self.handle_stop_conn,
'punch': self.handel_punch
}
# 提交任务到线程池
if message.get('action') in action_handlers:
self.thread_pool.submit(action_handlers[message['action']], message, addr)
else:
logger.warning(f"收到未知消息: {message}")
except Exception as e:
logger.error(f"处理UDP消息时发生错误: {str(e)}")
def run(self):
"""
运行服务连接端
"""
# 请求服务
if not self.request_service():
logger.error("服务请求失败,退出程序")
return
# 执行打洞
if not self.punch_hole():
logger.error("打洞失败,退出程序")
return
# 启动UDP监听线程
threading.Thread(target=self.udp_listener, daemon=True).start()
# 启动TCP监听线程
threading.Thread(target=self.tcp_listener, daemon=True).start()
# 保持主线程运行
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
self.running = False
self.udp_sock.sendto(json.dumps({
'action': 'stop_client',
'client_id': self.client_id
}).encode(), self.provider_addr)
self.udp_sock.close()
self.tcp_sock.close()
logger.info("服务连接端已停止")
if __name__ == '__main__':
# 配置信息
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
SERVICE_NAME = "ssh-jk-54htrsd324n6"
# SERVICE_NAME = "terraria-jk-2cxht5"
# SERVICE_NAME = "minecraft-jk-ytsvb54u6"
# SERVICE_NAME = "alist-jk-5shf43h6fdg"
LOCAL_PORT = 12345 # 本地映射端口
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
connector.run()

View File

@ -1,232 +0,0 @@
import socket
import json
import threading
import time
import uuid
class ServiceConnector:
def __init__(self, coordinator_addr, service_name, local_port):
self.target_id = None
self.coordinator_addr = coordinator_addr
self.service_name = service_name
self.local_port = local_port
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
# 创建UDP套接字用于协调通信
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('0.0.0.0', 0))
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
# 创建TCP套接字用于本地监听
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_sock.bind(('127.0.0.1', local_port))
self.tcp_sock.listen(5)
print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
# 存储活动连接
self.active_connections = {}
self.provider_addr = None
self.internal_port = None
self.running = True
def request_service(self):
"""向协调服务器请求服务"""
message = {
'action': 'request',
'service_name': self.service_name,
'client_id': self.client_id
}
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
# 等待响应
data, _ = self.udp_sock.recvfrom(4096)
response = json.loads(data.decode())
if response['status'] == 'success':
self.provider_addr = tuple(response['provider_addr'])
self.internal_port = response['internal_port']
self.target_id = response['target_id']
print(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}")
return True
else:
print(f"服务请求失败: {response['message']}")
return False
def punch_hole(self):
"""执行UDP打洞"""
if not self.provider_addr:
return False
# 请求打洞
message = {
'action': 'punch_request',
'client_id': self.client_id,
'target_id': self.target_id
}
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
# 等待协调服务器响应
data, _ = self.udp_sock.recvfrom(4096)
response = json.loads(data.decode())
if response['status'] != 'success':
print(f"打洞请求失败: {response['message']}")
return False
# 向服务提供者发送打洞包
print(f"尝试打洞到 {self.provider_addr}...")
for _ in range(5):
self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr)
time.sleep(0.5)
# 检查连通性
self.udp_sock.settimeout(10.0)
try:
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
data, addr = self.udp_sock.recvfrom(1024)
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
print("打洞成功! 已建立UDP连接")
return True
else:
print(f"错误的打洞响应{data}")
return False
except socket.timeout:
print("打洞失败: 未收到响应")
return False
finally:
self.udp_sock.settimeout(None)
def udp_listener(self):
"""监听UDP消息"""
while self.running:
try:
data, addr = self.udp_sock.recvfrom(65535)
message = json.loads(data.decode())
if message['action'] == 'punch_response':
# 打洞响应 - 确认连通性
print(f"收到来自 {addr} 的打洞响应")
elif message['action'] == 'data':
if message['conn_id'] in self.active_connections:
# 转发数据到本地客户端
print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}")
self.active_connections[message['conn_id']].send(bytes.fromhex(message['data']))
else:
print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接")
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': message['conn_id']
}).encode(), addr)
elif message['action'] == 'stop_conn':
# 停止连接
if message['conn_id'] in self.active_connections:
self.active_connections[message['conn_id']].close()
del self.active_connections[message['conn_id']]
print(f"已关闭本地连接 {message['conn_id']}")
else:
print(f"收到未知消息: {message}")
except Exception as e:
print(e)
def tcp_listener(self):
"""监听本地TCP连接"""
while self.running:
try:
client_sock, client_addr = self.tcp_sock.accept()
print(f"新的本地连接来自 {client_addr}")
# 为每个连接生成唯一ID
conn_id = str(uuid.uuid4())
# 存储连接
self.active_connections[conn_id] = client_sock
# 请求服务提供者建立连接
self.udp_sock.sendto(json.dumps({
'action': 'connect',
'client_id': self.client_id,
'conn_id': conn_id
}).encode(), self.provider_addr)
time.sleep(0.5)
# 启动数据转发线程
threading.Thread(
target=self.forward_data,
args=(conn_id, client_sock),
daemon=True
).start()
except:
pass
def forward_data(self, conn_id, client_sock):
"""转发本地TCP数据到UDP隧道"""
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
try:
while True:
# 从本地客户端读取数据
data = client_sock.recv(65535)
if not data:
break
# 通过UDP发送给服务提供者
print(f"发送数据到服务提供者: {self.provider_addr}")
self.udp_sock.sendto(json.dumps({
'action': 'data',
'conn_id': conn_id,
'data': data.hex() # 十六进制编码二进制数据
}).encode(), self.provider_addr)
# 分片发送数据
# for i in range(0, len(data), CHUNK_SIZE):
# chunk = data[i:i + CHUNK_SIZE]
# self.udp_sock.sendto(json.dumps({
# 'action': 'data',
# 'conn_id': conn_id,
# 'data': chunk.hex()
# }).encode(), self.provider_addr)
except:
pass
finally:
client_sock.close()
if conn_id in self.active_connections:
del self.active_connections[conn_id]
def run(self):
"""运行服务连接端"""
# 请求服务
if not self.request_service():
return
# 执行打洞
if not self.punch_hole():
return
# 启动UDP监听线程
threading.Thread(target=self.udp_listener, daemon=True).start()
# 启动TCP监听线程
threading.Thread(target=self.tcp_listener, daemon=True).start()
# 保持主线程运行
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
self.running = False
self.udp_sock.sendto(json.dumps({
'action': 'stop_client',
'client_id': self.client_id
}).encode(), self.provider_addr)
self.udp_sock.close()
self.tcp_sock.close()
print("服务连接端已停止")
if __name__ == '__main__':
# 配置信息
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
SERVICE_NAME = "my_game_server"
LOCAL_PORT = 12345 # 本地映射端口
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
connector.run()

View File

@ -1,51 +1,81 @@
import socket import socket
import json import json
import time import time
import logging
from collections import defaultdict from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import threading
# 设置日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class CoordinatorServer: class CoordinatorServer:
def __init__(self, host='0.0.0.0', port=5000): def __init__(self, host='0.0.0.0', port=5000):
"""
初始化协调服务器
:param host: 服务器绑定的主机地址默认为 '0.0.0.0'
:param port: 服务器监听的端口号默认为 5000
"""
self.host = host self.host = host
self.port = port self.port = port
self.services = {} # 服务名 -> (公网地址, 内网端口) self.clients = defaultdict(dict) # 客户端信息字典
self.clients = defaultdict(dict) # 客户端ID -> 信息 self.providers = defaultdict(dict) # 服务提供端信息字典
self.services = defaultdict(tuple) # 服务名称与提供者信息的映射
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind((host, port)) self.udp_sock.bind((host, port))
self.udp_sock.settimeout(1) # 设置 UDP 套接字接收超时时间为 1 秒
self.running = True self.running = True
print(f"协调服务器运行在 {host}:{port}") self.executor = ThreadPoolExecutor(max_workers=10) # 创建线程池
logger.debug(f"协调服务器运行在 {host}:{port}")
# 启动定时清理任务
self.executor.submit(self.cleanup_expired_services)
def handle_register(self, data, addr): def handle_register(self, data, addr):
"""处理服务注册请求""" """
处理服务注册请求
:param data: 包含服务注册信息的字典
:param addr: 发送请求的客户端地址
"""
try: try:
service_name = data['service_name'] provider_id = data['provider_id']
internal_port = data['internal_port'] services = data['services']
client_id = data['client_id']
# 记录客户端信息 # 记录客户端信息
self.clients[client_id] = { self.providers[provider_id] = {
'addr': addr, 'addr': addr,
'service_name': service_name, 'services': services,
'internal_port': internal_port,
'last_seen': time.time() 'last_seen': time.time()
} }
# 注册服务 # 遍历 services 字典,记录每个服务的名称和端口
self.services[service_name] = (addr, internal_port) for service_name, service_port in services.items():
print(f"服务注册: {service_name} (端口:{internal_port}) 来自 {addr}") self.services[service_name] = (addr, service_port, provider_id)
return {'status': 'success', 'message': '服务注册成功'} logger.info(f"注册来自{addr}{services}")
# 直接回复注册成功消息
response = {'status': 'success', 'message': '服务注册成功'}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
except Exception as e: except Exception as e:
return {'status': 'error', 'message': str(e)} response = {'status': 'error', 'message': str(e)}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
def handle_request(self, data, addr): def handle_request(self, data, addr):
"""处理服务请求""" """
处理服务请求
:param data: 包含服务请求信息的字典
:param addr: 发送请求的客户端地址
"""
try: try:
service_name = data['service_name'] service_name = data['service_name']
client_id = data['client_id'] client_id = data['client_id']
if service_name not in self.services: if service_name not in self.services:
return {'status': 'error', 'message': '服务未找到'} response = {'status': 'error', 'message': '服务未找到'}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
# 记录请求客户端信息 # 记录请求客户端信息
self.clients[client_id] = { self.clients[client_id] = {
@ -55,55 +85,118 @@ class CoordinatorServer:
} }
# 获取服务提供者的信息 # 获取服务提供者的信息
provider_addr, internal_port = self.services[service_name] provider_addr, internal_port, provider_id = self.services[service_name]
target_id = [
client_id
for client_id, info in self.clients.items()
if info['service_name'] == service_name
][0]
print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}") logger.debug(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
return { response = {
'status': 'success', 'status': 'success',
'provider_addr': provider_addr, 'provider_addr': provider_addr,
'internal_port': internal_port, 'internal_port': internal_port,
'target_id': target_id 'provider_id': provider_id
} }
self.udp_sock.sendto(json.dumps(response).encode(), addr)
except Exception as e: except Exception as e:
return {'status': 'error', 'message': str(e)} response = {'status': 'error', 'message': str(e)}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
def handle_punch_request(self, data, addr): def handle_punch_request(self, data, addr):
"""处理打洞请求""" """
处理打洞请求
:param data: 包含打洞请求信息的字典
:param addr: 发送请求的客户端地址
"""
try: try:
client_id = data['client_id'] client_id = data['client_id']
target_id = data['target_id'] provider_id = data['provider_id']
# 获取目标客户端信息 # 获取目标客户端信息
if target_id not in self.clients: if provider_id not in self.providers:
return {'status': 'error', 'message': '目标客户端未找到'} response = {'status': 'error', 'message': '目标提供端未找到'}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
target_addr = self.clients[target_id]['addr'] provider_addr = self.providers[provider_id]['addr']
print(f"打洞请求: {addr} -> {target_addr}") logger.info(f"打洞请求: {addr} -> {provider_id}")
# 通知双方对方的地址 # 通知双方对方的地址
self.udp_sock.sendto(json.dumps({ self.udp_sock.sendto(json.dumps({
'action': 'punch_request', 'action': 'punch_request',
'client_id': client_id, 'client_id': client_id,
'client_addr': addr 'client_addr': addr
}).encode(), target_addr) }).encode(), provider_addr)
return { self.udp_sock.sendto(json.dumps({
'status': 'success', 'status': 'success',
'target_addr': target_addr 'provider_addr': provider_addr
} }).encode(), addr)
except Exception as e: except Exception as e:
return {'status': 'error', 'message': str(e)} logger.error(f"处理打洞请求时出错: {e}")
response = {'status': 'error', 'message': str(e)}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
def handle_stop_provider(self, data, addr):
"""
处理停止服务请求
:param data: 包含停止服务信息的字典
:param addr: 发送请求的客户端地址
"""
try:
service_name = data['service_name']
self.services.pop(service_name, None)
response = {'status': 'success', 'message': '服务停止成功'}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
except Exception as e:
response = {'status': 'error', 'message': str(e)}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
def handle_heartbeat(self, data, addr):
"""
处理心跳包
:param data: 包含心跳信息的字典
:param addr: 发送心跳的客户端地址
"""
try:
provider_id = data['provider_id']
self.providers[provider_id]['last_seen'] = time.time()
response = {'status': 'success', 'message': '心跳更新成功'}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
except Exception as e:
response = {'status': 'error', 'message': str(e)}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
def cleanup_expired_services(self):
"""
定时清理过期服务
每20秒检查一次移除超过30秒未更新心跳的服务
"""
while self.running:
time.sleep(60) # 每60秒检查一次
current_time = time.time()
expired_providers = []
for provider_id, provider in self.providers.items():
if current_time - provider['last_seen'] > 60: # 心跳包最后更新时间大于30秒
expired_providers.append(provider_id)
logger.info(f"服务过期: {provider['addr']}")
for provider_id in expired_providers:
provider = self.providers[provider_id]
for service_name in provider['services'].keys(): # 使用新的 services 字段
self.services.pop(service_name, None)
self.providers.pop(provider_id, None)
def run(self): def run(self):
"""运行协调服务器""" """
print("协调服务器已启动,等待连接...") 运行协调服务器
"""
logger.info(f"协调服务器已启动,端口{self.udp_sock.getsockname()[1]},等待连接...")
action_handlers = {
'register': self.handle_register, # 服务注册处理行为
'request': self.handle_request, # 服务请求处理行为
'punch_request': self.handle_punch_request, # 打洞处理行为
'stop_provider': self.handle_stop_provider, # 停止服务行为
'heartbeat': self.handle_heartbeat # 心跳处理行为
}
while self.running: while self.running:
try: try:
data, addr = self.udp_sock.recvfrom(4096) data, addr = self.udp_sock.recvfrom(4096)
@ -111,30 +204,56 @@ class CoordinatorServer:
message = json.loads(data.decode()) message = json.loads(data.decode())
action = message.get('action') action = message.get('action')
if action == 'register': handler = action_handlers.get(action)
response = self.handle_register(message, addr) if handler: # 如果存在对应的处理行为,则执行它
elif action == 'request': self.executor.submit(handler, message, addr).result()
response = self.handle_request(message, addr) else: # 如果没有对应的处理行为,则返回错误响应
elif action == 'punch_request': self.udp_sock.sendto(json.dumps({
response = self.handle_punch_request(message, addr) 'status': 'error',
elif action == 'stop_provider': 'message': '无效操作'}).encode(), addr)
self.services.pop(message['service_name'], None)
response = {'status': 'success', 'message': '服务停止成功'}
else:
response = {'status': 'error', 'message': '无效操作'}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
except json.JSONDecodeError: except json.JSONDecodeError:
self.udp_sock.sendto(json.dumps({ self.udp_sock.sendto(json.dumps({
'status': 'error', 'status': 'error',
'message': '无效的JSON数据' 'message': '无效的JSON数据'
}).encode(), addr) }).encode(), addr)
except socket.timeout: # 捕获 UDP 接收超时异常
pass # 不做任何处理,允许主线程继续执行
except Exception as e: except Exception as e:
print(f"服务器错误: {str(e)}") logger.debug(f"服务器错误: {str(e)}")
self.udp_sock.close() def start(self):
"""
启动协调服务器
"""
# 创建线程运行 run 方法
server_thread = threading.Thread(target=self.run)
server_thread.start()
try:
# 主线程捕获键盘打断信号
while self.running:
time.sleep(1) # 防止主线程空转
except KeyboardInterrupt:
logger.info("检测到键盘打断,准备退出...")
self.running = False
# 通知所有提供端停止服务
for provider_id, provider_info in self.providers.items():
try:
self.udp_sock.sendto(json.dumps({
'action': 'stop_provider',
}).encode(), provider_info['addr'])
logger.info(f"已通知提供端 {provider_id} 停止服务")
except Exception as e:
logger.error(f"通知提供端 {provider_id} 停止服务时出错: {e}")
server_thread.join() # 等待服务器线程退出
# 关闭线程池和套接字
self.executor.shutdown()
self.udp_sock.close()
logger.info("协调服务器已安全退出")
if __name__ == '__main__': if __name__ == '__main__':
server = CoordinatorServer() server = CoordinatorServer()
server.run() server.start()

348
provider.py Normal file
View File

@ -0,0 +1,348 @@
import socket
import json
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ServiceProvider:
def __init__(self, coordinator_addr, services):
"""
初始化服务提供者
:param coordinator_addr: 协调服务器地址 (IP, port)
:param services: 提供的服务列表 {服务名: 端口号}
"""
self.provider_id = f"provider-{uuid.uuid4().hex[:8]}"
self.coordinator_addr = coordinator_addr
self.services = services
# 创建UDP套接字用于协调通信
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('0.0.0.0', 0))
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
# 创建线程池用于处理UDP消息
self.thread_pool = ThreadPoolExecutor(max_workers=10)
self.clients = {}
# 存储活动连接
self.active_connections = {}
self.running = True
# 心跳线程
self.heartbeat_thread = threading.Thread(target=self.send_heartbeat, daemon=True)
def send_heartbeat(self):
"""
发送心跳包到协调服务器
"""
while self.running:
try:
message = {
'action': 'heartbeat',
'provider_id': self.provider_id
}
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
logger.debug(f"发送心跳包给协调服务器 {self.coordinator_addr}")
time.sleep(20) # 每20秒发送一次心跳包
except Exception as e:
logger.error(f"发送心跳包失败: {str(e)}")
def register_service(self):
"""
向协调服务器注册服务
:return: 注册是否成功
"""
message = {
'action': 'register',
'services': self.services,
'provider_id': self.provider_id
}
logger.info(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.services}'")
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
# 等待响应
try:
data, _ = self.udp_sock.recvfrom(4096)
response = json.loads(data.decode())
if response['status'] == 'success':
logger.info(f"服务 '{self.services}' 注册成功")
return True
else:
logger.error(f"注册失败: {response['message']}")
return False
except Exception as e:
logger.error(f"注册服务时发生错误: {str(e)}")
return False
def handle_punch(self, message, addr):
"""
处理打洞请求
:param message: 打洞请求消息
:param addr: 客户端地址
"""
self.udp_sock.sendto(json.dumps(
{'action': 'punch_response',
'client_id': message['client_id'],
'provider_id': self.provider_id
}).encode(), addr)
logger.debug(f"收到来自 {addr} 的打洞请求,已响应")
def handle_punch_response(self, _, addr):
"""
处理打洞响应
:param addr: 客户端地址
"""
logger.debug(f"收到来自 {addr} 的打洞响应")
def handle_connect_request(self, message, addr):
"""
处理连接请求
:param message: 连接请求消息
:param addr: 客户端地址
"""
conn_id = message['conn_id']
client_id = message['client_id']
service_name = message['service_name']
logger.debug(f"收到来自 {addr} 的连接请求")
threading.Thread(
target=self.handle_connection,
args=(conn_id, addr, client_id, service_name),
daemon=True
).start()
def handle_punch_request(self, message, _):
"""
处理打洞请求
:param message: 打洞请求消息
"""
for i in range(10):
try:
self.udp_sock.sendto(json.dumps({
'action': 'punch',
'client_id': message['client_id'],
'provider_id': self.provider_id,
}).encode(), tuple(message['client_addr']))
time.sleep(0.5)
except Exception as e:
logger.error(f"打洞失败: {str(e)}")
time.sleep(1)
def handle_data(self, message, addr):
"""
处理数据消息
:param message: 数据消息
:param addr: 客户端地址
"""
conn_id = message['conn_id']
data = bytes.fromhex(message['data'])
if conn_id in self.active_connections:
# 转发数据到本地服务
logger.debug(f"收到来自 {addr} 的数据,转发到本地服务")
self.active_connections[conn_id]['local_sock'].sendall(data)
else:
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': conn_id
}).encode(), addr)
logger.debug(f"收到来自 {addr} 的数据,但未找到对应的连接")
def handle_stop_conn(self, message, _):
"""
处理停止连接请求
:param message: 停止连接请求消息
"""
conn_id = message['conn_id']
if conn_id in self.active_connections:
self.active_connections[conn_id]['local_sock'].close()
self.active_connections.pop(conn_id, None)
def handle_stop_client(self, message, addr):
"""
处理停止客户端请求
:param message: 停止客户端请求消息
:param addr: 客户端地址
"""
client_id = message['client_id']
for conn_id, conn_info in self.active_connections.items():
if conn_info['client_id'] == client_id:
conn_info['local_sock'].close()
self.active_connections.pop(conn_id, None)
def handle_stop_provider(self, message, _):
"""
处理停止服务提供者请求
:param message: 停止服务提供者请求消息
"""
logger.info("收到停止服务提供者请求,正在关闭所有连接...")
for conn_id, conn_info in self.active_connections.items():
conn_info['local_sock'].close()
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': conn_id
}).encode(), conn_info['client_addr'])
self.active_connections.clear()
self.running = False
self.udp_sock.close()
self.thread_pool.shutdown(wait=True)
logger.info("服务提供者已停止")
def handle_connection(self, conn_id, client_addr, client_id, service_name):
"""
处理来自客户端的连接
:param conn_id: 连接ID
:param client_addr: 客户端地址
:param client_id: 客户端ID
:param service_name: 服务名称
"""
internal_port = self.services[service_name]
try:
# 接受本地服务连接
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
local_sock.connect(('127.0.0.1', internal_port))
if not local_sock:
logger.error("无法连接到本地服务")
return
# 创建与客户端的UDP隧道
logger.debug(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', internal_port)}")
# 存储连接
self.active_connections[conn_id] = {
'local_sock': local_sock,
'client_addr': client_addr,
'client_id': client_id
}
# 通知客户端连接就绪
self.udp_sock.sendto(json.dumps({
'action': 'connected',
'client_id': conn_id
}).encode(), client_addr)
# 启动数据转发
self.forward_data(conn_id, local_sock, client_addr)
except Exception as e:
logger.error(f"连接失败: {str(e)}")
self.udp_sock.sendto(json.dumps({
'action': 'connect_failed',
'client_id': conn_id,
'message': str(e)
}).encode(), client_addr)
def forward_data(self, conn_id, local_sock, client_addr):
"""
转发TCP数据到UDP隧道
:param conn_id: 连接ID
:param local_sock: 本地服务套接字
:param client_addr: 客户端地址
"""
try:
while True:
# 从本地服务读取数据
data = local_sock.recv(4096)
if not data:
break
# 通过UDP发送给客户端
self.udp_sock.sendto(json.dumps({
'action': 'data',
'conn_id': conn_id,
'data': data.hex() # 十六进制编码二进制数据
}).encode(), client_addr)
logger.debug(f"转发数据给客户端{client_addr}")
except Exception as e:
logger.error(f"转发数据失败: {str(e)}")
finally:
local_sock.close()
if conn_id in self.active_connections:
del self.active_connections[conn_id]
logger.debug(f"连接 {conn_id} 已关闭")
def udp_listener(self):
"""
监听UDP消息并处理
"""
data = None
while self.running:
try:
data, addr = self.udp_sock.recvfrom(4096)
logger.debug(f"收到来自 {addr} 的消息: {data}")
message = json.loads(data.decode())
# 使用字典映射处理不同消息类型
action_handlers = {
'punch': self.handle_punch,
'punch_check': self.handle_punch,
'punch_response': self.handle_punch_response,
'connect': self.handle_connect_request,
'punch_request': self.handle_punch_request,
'data': self.handle_data,
'stop_conn': self.handle_stop_conn,
'stop_client': self.handle_stop_client,
'stop_provider': self.handle_stop_provider,
}
# 提交任务到线程池
if message.get('action') in action_handlers:
self.thread_pool.submit(action_handlers[message['action']], message, addr)
elif message.get('status') == 'error':
logger.error(f"来自 {addr} 的错误消息: {message}")
elif message.get('status') == 'success':
logger.debug(f"来自 {addr} 的成功消息: {message}")
else:
logger.warning(f"收到未知消息: {message}")
except Exception as e:
logger.error(f"处理UDP消息时发生错误: {str(e)}")
if data:
logger.error(f"无法处理消息: {data}")
def run(self):
"""
运行服务提供端
"""
# 注册服务
if not self.register_service():
logger.error("服务注册失败,退出程序")
return
# 启动UDP监听线程
threading.Thread(target=self.udp_listener, daemon=True).start()
# 启动心跳线程
self.heartbeat_thread.start()
# 保持主线程运行
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
self.running = False
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
for conn_id, conn_info in self.active_connections.items():
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': conn_id
}).encode(), conn_info['client_addr'])
self.udp_sock.close()
logger.info("服务提供端已停止")
if __name__ == '__main__':
# 配置信息
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
SERVICES = {
'terraria-jk-2cxht5': 5001,
'minecraft-jk-ytsvb54u6': 5002,
'alist-jk-5shf43h6fdg': 5244,
'ssh-jk-54htrsd324n6': 22
}
provider = ServiceProvider(COORDINATOR_ADDR, SERVICES)
provider.run()

View File

@ -1,220 +0,0 @@
import socket
import json
import threading
import time
import uuid
class ServiceProvider:
def __init__(self, coordinator_addr, service_name, internal_port):
self.coordinator_addr = coordinator_addr
self.service_name = service_name
self.internal_port = internal_port
self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
# 创建UDP套接字用于协调通信
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('0.0.0.0', 0))
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
# # 创建TCP套接字用于服务监听
# self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.tcp_sock.bind(('0.0.0.0', internal_port))
# self.tcp_sock.listen(5)
print(f"服务端监听在端口 {internal_port}")
# 存储活动连接
self.active_connections = {}
self.running = True
def register_service(self):
"""向协调服务器注册服务"""
message = {
'action': 'register',
'service_name': self.service_name,
'internal_port': self.internal_port,
'external_port': self.udp_sock.getsockname()[1],
'client_id': self.client_id
}
print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'")
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
# 等待响应
data, _ = self.udp_sock.recvfrom(4096)
response = json.loads(data.decode())
if response['status'] == 'success':
print(f"服务 '{self.service_name}' 注册成功")
else:
print(f"注册失败: {response['message']}")
def udp_listener(self):
"""监听UDP消息"""
while self.running:
try:
data, addr = self.udp_sock.recvfrom(4096)
print(f"收到来自 {addr} 的消息: {data.decode()}")
message = json.loads(data.decode())
if message.get('action') == 'punch' or message.get('action') == 'punch_check':
# 打洞请求 - 发送响应以确认连通性
self.udp_sock.sendto(json.dumps(
{'action': 'punch_response',
'client_id': message['client_id']
}).encode(), addr)
print(f"收到来自 {addr} 的打洞请求,已响应")
elif message.get('action') == 'punch_response':
# 打洞响应 - 确认打洞成功
print(f"收到来自 {addr} 的打洞响应")
elif message.get('action') == 'connect':
# 新的连接请求
conn_id = message['conn_id']
client_id = message['client_id']
print(f"收到来自 {addr} 的连接请求")
threading.Thread(
target=self.handle_connection,
args=(conn_id, addr, client_id),
daemon=True
).start()
elif message.get('action') == 'punch_request':
client_addr = message['client_addr']
print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞")
self.punch_hole(message)
elif message.get('action') == 'data':
# 接收到来自客户端的数据
conn_id = message['conn_id']
data = bytes.fromhex(message['data'])
if conn_id in self.active_connections:
# 转发数据到本地服务
print(f"收到来自 {addr} 的数据,转发到本地服务")
self.active_connections[conn_id]['local_sock'].sendall(data)
else:
print(f"收到来自 {addr} 的数据,但未找到对应的连接")
elif message.get('action') == 'stop_conn':
conn_id = message['conn_id']
if conn_id in self.active_connections:
self.active_connections[conn_id]['local_sock'].close()
self.active_connections.pop(conn_id)
elif message.get('action') == 'stop_client':
client_id = message['client_id']
for conn_id, conn_info in self.active_connections.items():
if conn_info['client_id'] == client_id:
conn_info['local_sock'].close()
self.active_connections.pop(conn_id)
else:
print(f"收到未知消息: {message}")
except Exception as e:
print(e)
pass
def handle_connection(self, conn_id, client_addr, client_id):
"""处理来自客户端的连接"""
try:
# 接受本地服务连接
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
local_sock.connect(('127.0.0.1', self.internal_port))
if not local_sock:
print("无法连接到本地服务")
return
# 创建与客户端的UDP隧道
print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}")
# 存储连接
self.active_connections[conn_id] = {
'local_sock': local_sock,
'client_addr': client_addr,
'client_id': client_id
}
# 通知客户端连接就绪
self.udp_sock.sendto(json.dumps({
'action': 'connected',
'client_id': conn_id
}).encode(), client_addr)
# 启动数据转发
self.forward_data(conn_id, local_sock, client_addr)
except Exception as e:
print(f"连接失败: {str(e)}")
self.udp_sock.sendto(json.dumps({
'action': 'connect_failed',
'client_id': conn_id,
'message': str(e)
}).encode(), client_addr)
def forward_data(self, conn_id, local_sock, client_addr):
"""转发TCP数据到UDP隧道"""
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
try:
while True:
# 从本地服务读取数据
data = local_sock.recv(65535)
if not data:
break
# 通过UDP发送给客户端
self.udp_sock.sendto(json.dumps({
'action': 'data',
'conn_id': conn_id,
'data': data.hex() # 十六进制编码二进制数据
}).encode(), client_addr)
print(f"转发数据给客户端{client_addr}")
# for i in range(0, len(data), CHUNK_SIZE):
# chunk = data[i:i + CHUNK_SIZE]
# self.udp_sock.sendto(json.dumps({
# 'action': 'data',
# 'conn_id': conn_id,
# 'data': chunk.hex()
# }).encode(), client_addr)
except Exception as e:
print(f"转发数据失败: {str(e)}")
finally:
local_sock.close()
if conn_id in self.active_connections:
del self.active_connections[conn_id]
print(f"连接 {conn_id} 已关闭")
def run(self):
"""运行服务提供端"""
# 注册服务
self.register_service()
# 启动UDP监听线程
threading.Thread(target=self.udp_listener, daemon=True).start()
# 保持主线程运行
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
self.running = False
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
for conn_id, conn_info in self.active_connections.items():
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': conn_id
}).encode(), conn_info['client_addr'])
self.udp_sock.close()
print("服务提供端已停止")
def punch_hole(self, message):
for i in range(5):
try:
self.udp_sock.sendto(json.dumps({
'action': 'punch',
'client_id': message['client_id']
}).encode(), tuple(message['client_addr']))
time.sleep(0.2)
except Exception as e:
print(f"打洞失败: {str(e)}")
time.sleep(1)
if __name__ == '__main__':
# 配置信息
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
SERVICE_NAME = "my_game_server"
INTERNAL_PORT = 5001 # 内网游戏服务器端口
provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
provider.run()