diff --git a/connector.py b/connector.py new file mode 100644 index 0000000..bdb1934 --- /dev/null +++ b/connector.py @@ -0,0 +1,317 @@ +import socket +import json +import threading +from concurrent.futures import ThreadPoolExecutor +import time +import uuid +import logging + +# 配置日志 +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +class ServiceConnector: + def __init__(self, coordinator_addr, service_name, local_port): + """ + 初始化服务连接器 + :param coordinator_addr: 协调服务器地址 (IP, port) + :param service_name: 请求的服务名称 + :param local_port: 本地监听端口 + """ + self.coordinator_addr = coordinator_addr + self.service_name = service_name + self.local_port = local_port + self.client_id = f"connector-{uuid.uuid4().hex[:8]}" + + # 创建UDP套接字用于协调通信 + self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.udp_sock.bind(('0.0.0.0', 0)) + self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) + + # 创建TCP套接字用于本地监听 + self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.tcp_sock.bind(('127.0.0.1', local_port)) + self.tcp_sock.listen(5) + logger.info(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'") + + # 注册线程池 + self.thread_pool = ThreadPoolExecutor(max_workers=10) + + # 存储活动连接 + self.active_connections = {} + self.provider_addr = None + self.provider_id = None + self.internal_port = None + self.running = True + + def request_service(self): + """ + 向协调服务器请求服务 + :return: 请求是否成功 + """ + message = { + 'action': 'request', + 'service_name': self.service_name, + 'client_id': self.client_id + } + logger.info(f"向协调服务器 {self.coordinator_addr} 请求服务 '{self.service_name}'") + self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) + + # 等待响应 + try: + data, _ = self.udp_sock.recvfrom(4096) + response = json.loads(data.decode()) + if response['status'] == 'success': + self.provider_addr = tuple(response['provider_addr']) + self.internal_port = response['internal_port'] + self.provider_id = response['provider_id'] + logger.info(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}") + return True + else: + logger.error(f"服务请求失败: {response['message']}") + return False + except Exception as e: + logger.error(f"请求服务时发生错误: {str(e)}") + return False + + def punch_hole(self): + """ + 执行UDP打洞 + :return: 打洞是否成功 + """ + if not self.provider_addr: + return False + + # 请求协服务器发起打洞 + message = { + 'action': 'punch_request', + 'client_id': self.client_id, + 'provider_id': self.provider_id + } + logger.info(f"向协调服务器 {self.coordinator_addr} 请求打洞到 {self.provider_addr}") + self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) + + # 等待协调服务器响应 + try: + data, _ = self.udp_sock.recvfrom(4096) + response = json.loads(data.decode()) + if response['status'] != 'success': + logger.error(f"打洞请求失败: {response['message']}") + return False + + # 向服务提供者发送打洞包 + logger.info(f"尝试打洞到 {self.provider_addr}...") + for _ in range(10): + self.udp_sock.sendto(json.dumps({ + 'action': 'punch', + 'client_id': self.client_id + }).encode(), self.provider_addr) + time.sleep(0.2) + + # 检查连通性 + self.udp_sock.settimeout(10.0) + try: + self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr) + data, addr = self.udp_sock.recvfrom(4096) + if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr: + logger.info("打洞成功! 已建立UDP连接") + return True + else: + logger.error(f"错误的打洞响应{data}") + return False + except socket.timeout: + logger.error("打洞失败: 未收到响应") + return False + finally: + self.udp_sock.settimeout(None) + + def handle_punch_response(self, message, addr): + """ + 处理打洞响应 + :param message: 打洞响应消息 + :param addr: 服务提供者地址 + """ + logger.debug(f"收到来自 {addr} 的打洞响应") + + def handle_stop_conn(self, message, addr): + """ + 处理停止连接请求 + :param message: 停止连接请求消息 + :param addr: 服务提供者地址 + """ + conn_id = message['conn_id'] + if conn_id in self.active_connections: + self.active_connections[conn_id].close() + self.active_connections.pop(conn_id, None) + logger.debug(f"关闭本地连接 {conn_id}") + + def tcp_listener(self): + """ + 监听本地TCP连接 + """ + while self.running: + try: + client_sock, client_addr = self.tcp_sock.accept() + logger.debug(f"新的本地连接来自 {client_addr}") + + # 为每个连接生成唯一ID + conn_id = str(uuid.uuid4()) + + # 存储连接 + self.active_connections[conn_id] = client_sock + + # 请求服务提供者建立连接 + self.udp_sock.sendto(json.dumps({ + 'action': 'connect', + 'client_id': self.client_id, + 'conn_id': conn_id, + 'service_name': self.service_name + }).encode(), self.provider_addr) + + time.sleep(0.5) + + # 启动数据转发线程 + threading.Thread( + target=self.forward_data, + args=(conn_id, client_sock), + daemon=True + ).start() + + except Exception as e: + logger.error(f"处理本地连接时发生错误: {str(e)}") + + def handel_punch(self, message, addr): + """ + 处理UDP打洞请求 + :param message: 打洞请求消息 + :param addr: 服务提供者地址 + """ + logger.debug(f"收到来自 {addr} 的UDP打洞请求") + self.udp_sock.sendto(json.dumps({ + 'action': 'punch_response', + 'client_id': self.client_id, + 'provider_id': self.provider_id + }).encode(), addr) + + + def handle_data(self, message, addr): + """ + 处理数据消息 + :param message: 数据消息 + :param addr: 服务提供者地址 + """ + conn_id = message['conn_id'] + data = bytes.fromhex(message['data']) + if conn_id in self.active_connections: + # 转发数据到本地客户端 + logger.debug(f"收到来自 {addr} 的数据,转发到本地连接 {conn_id}") + self.active_connections[conn_id].sendall(data) + else: + self.udp_sock.sendto(json.dumps({'action': 'stop_conn', 'conn_id': conn_id}).encode(), addr) + logger.debug(f"收到来自 {addr} 的数据,但未找到对应的本地连接") + + def forward_data(self, conn_id, client_sock): + """ + 转发本地TCP数据到UDP隧道 + :param conn_id: 连接ID + :param client_sock: 本地客户端套接字 + """ + try: + while True: + # 从本地客户端读取数据 + data = client_sock.recv(4096) + if not data: + break + + # 通过UDP发送给服务提供者 + logger.debug(f"发送数据到服务提供者: {self.provider_addr}") + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'conn_id': conn_id, + 'data': data.hex() # 十六进制编码二进制数据 + }).encode(), self.provider_addr) + except Exception as e: + logger.error(f"转发数据失败: {str(e)}") + finally: + client_sock.close() + if conn_id in self.active_connections: + del self.active_connections[conn_id] + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_conn', + 'conn_id': conn_id + }).encode(), self.provider_addr) + logger.debug(f"连接 {conn_id} 已关闭") + + def udp_listener(self): + """ + 监听UDP消息并处理 + """ + while self.running: + try: + data, addr = self.udp_sock.recvfrom(65535) + logger.debug(f"收到来自 {addr} 的消息: {data}") + message = json.loads(data.decode()) + + # 使用字典映射处理不同消息类型 + action_handlers = { + 'punch_response': self.handle_punch_response, + 'data': self.handle_data, + 'stop_conn': self.handle_stop_conn, + 'punch': self.handel_punch + } + + # 提交任务到线程池 + if message.get('action') in action_handlers: + self.thread_pool.submit(action_handlers[message['action']], message, addr) + else: + logger.warning(f"收到未知消息: {message}") + except Exception as e: + logger.error(f"处理UDP消息时发生错误: {str(e)}") + + def run(self): + """ + 运行服务连接端 + """ + # 请求服务 + if not self.request_service(): + logger.error("服务请求失败,退出程序") + return + + # 执行打洞 + if not self.punch_hole(): + logger.error("打洞失败,退出程序") + return + + # 启动UDP监听线程 + threading.Thread(target=self.udp_listener, daemon=True).start() + + # 启动TCP监听线程 + threading.Thread(target=self.tcp_listener, daemon=True).start() + + # 保持主线程运行 + try: + while self.running: + time.sleep(1) + except KeyboardInterrupt: + self.running = False + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_client', + 'client_id': self.client_id + }).encode(), self.provider_addr) + self.udp_sock.close() + self.tcp_sock.close() + logger.info("服务连接端已停止") + + +if __name__ == '__main__': + # 配置信息 + COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP + SERVICE_NAME = "ssh-jk-54htrsd324n6" + # SERVICE_NAME = "terraria-jk-2cxht5" + # SERVICE_NAME = "minecraft-jk-ytsvb54u6" + # SERVICE_NAME = "alist-jk-5shf43h6fdg" + LOCAL_PORT = 12345 # 本地映射端口 + + connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT) + connector.run() diff --git a/connector1.py b/connector1.py deleted file mode 100644 index 9c5fa32..0000000 --- a/connector1.py +++ /dev/null @@ -1,232 +0,0 @@ -import socket -import json -import threading -import time -import uuid - - -class ServiceConnector: - def __init__(self, coordinator_addr, service_name, local_port): - self.target_id = None - self.coordinator_addr = coordinator_addr - self.service_name = service_name - self.local_port = local_port - self.client_id = f"connector-{uuid.uuid4().hex[:8]}" - - # 创建UDP套接字用于协调通信 - self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) - - # 创建TCP套接字用于本地监听 - self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.tcp_sock.bind(('127.0.0.1', local_port)) - self.tcp_sock.listen(5) - print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'") - - # 存储活动连接 - self.active_connections = {} - self.provider_addr = None - self.internal_port = None - self.running = True - - def request_service(self): - """向协调服务器请求服务""" - message = { - 'action': 'request', - 'service_name': self.service_name, - 'client_id': self.client_id - } - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) - - # 等待响应 - data, _ = self.udp_sock.recvfrom(4096) - response = json.loads(data.decode()) - if response['status'] == 'success': - self.provider_addr = tuple(response['provider_addr']) - self.internal_port = response['internal_port'] - self.target_id = response['target_id'] - print(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}") - return True - else: - print(f"服务请求失败: {response['message']}") - return False - - def punch_hole(self): - """执行UDP打洞""" - if not self.provider_addr: - return False - - # 请求打洞 - message = { - 'action': 'punch_request', - 'client_id': self.client_id, - 'target_id': self.target_id - } - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) - - # 等待协调服务器响应 - data, _ = self.udp_sock.recvfrom(4096) - response = json.loads(data.decode()) - if response['status'] != 'success': - print(f"打洞请求失败: {response['message']}") - return False - - # 向服务提供者发送打洞包 - print(f"尝试打洞到 {self.provider_addr}...") - for _ in range(5): - self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr) - time.sleep(0.5) - - # 检查连通性 - self.udp_sock.settimeout(10.0) - try: - self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr) - data, addr = self.udp_sock.recvfrom(1024) - if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr: - print("打洞成功! 已建立UDP连接") - return True - else: - print(f"错误的打洞响应{data}") - return False - except socket.timeout: - print("打洞失败: 未收到响应") - return False - finally: - self.udp_sock.settimeout(None) - - def udp_listener(self): - """监听UDP消息""" - while self.running: - try: - data, addr = self.udp_sock.recvfrom(65535) - message = json.loads(data.decode()) - - if message['action'] == 'punch_response': - # 打洞响应 - 确认连通性 - print(f"收到来自 {addr} 的打洞响应") - elif message['action'] == 'data': - if message['conn_id'] in self.active_connections: - # 转发数据到本地客户端 - print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}") - self.active_connections[message['conn_id']].send(bytes.fromhex(message['data'])) - else: - print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接") - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': message['conn_id'] - }).encode(), addr) - elif message['action'] == 'stop_conn': - # 停止连接 - if message['conn_id'] in self.active_connections: - self.active_connections[message['conn_id']].close() - del self.active_connections[message['conn_id']] - print(f"已关闭本地连接 {message['conn_id']}") - else: - print(f"收到未知消息: {message}") - except Exception as e: - print(e) - - def tcp_listener(self): - """监听本地TCP连接""" - while self.running: - try: - client_sock, client_addr = self.tcp_sock.accept() - print(f"新的本地连接来自 {client_addr}") - - # 为每个连接生成唯一ID - conn_id = str(uuid.uuid4()) - - # 存储连接 - self.active_connections[conn_id] = client_sock - - # 请求服务提供者建立连接 - self.udp_sock.sendto(json.dumps({ - 'action': 'connect', - 'client_id': self.client_id, - 'conn_id': conn_id - }).encode(), self.provider_addr) - - time.sleep(0.5) - - # 启动数据转发线程 - threading.Thread( - target=self.forward_data, - args=(conn_id, client_sock), - daemon=True - ).start() - - except: - pass - - def forward_data(self, conn_id, client_sock): - """转发本地TCP数据到UDP隧道""" - CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小 - try: - while True: - # 从本地客户端读取数据 - data = client_sock.recv(65535) - if not data: - break - - # 通过UDP发送给服务提供者 - print(f"发送数据到服务提供者: {self.provider_addr}") - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': data.hex() # 十六进制编码二进制数据 - }).encode(), self.provider_addr) - # 分片发送数据 - # for i in range(0, len(data), CHUNK_SIZE): - # chunk = data[i:i + CHUNK_SIZE] - # self.udp_sock.sendto(json.dumps({ - # 'action': 'data', - # 'conn_id': conn_id, - # 'data': chunk.hex() - # }).encode(), self.provider_addr) - except: - pass - finally: - client_sock.close() - if conn_id in self.active_connections: - del self.active_connections[conn_id] - - def run(self): - """运行服务连接端""" - # 请求服务 - if not self.request_service(): - return - - # 执行打洞 - if not self.punch_hole(): - return - - # 启动UDP监听线程 - threading.Thread(target=self.udp_listener, daemon=True).start() - - # 启动TCP监听线程 - threading.Thread(target=self.tcp_listener, daemon=True).start() - - # 保持主线程运行 - try: - while self.running: - time.sleep(1) - except KeyboardInterrupt: - self.running = False - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_client', - 'client_id': self.client_id - }).encode(), self.provider_addr) - self.udp_sock.close() - self.tcp_sock.close() - print("服务连接端已停止") - - -if __name__ == '__main__': - # 配置信息 - COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP - SERVICE_NAME = "my_game_server" - LOCAL_PORT = 12345 # 本地映射端口 - - connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT) - connector.run() diff --git a/coordinator.py b/coordinator.py index 566aa32..b465429 100644 --- a/coordinator.py +++ b/coordinator.py @@ -1,51 +1,81 @@ import socket import json import time +import logging from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +import threading + +# 设置日志配置 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) class CoordinatorServer: def __init__(self, host='0.0.0.0', port=5000): + """ + 初始化协调服务器。 + :param host: 服务器绑定的主机地址,默认为 '0.0.0.0'。 + :param port: 服务器监听的端口号,默认为 5000。 + """ self.host = host self.port = port - self.services = {} # 服务名 -> (公网地址, 内网端口) - self.clients = defaultdict(dict) # 客户端ID -> 信息 + self.clients = defaultdict(dict) # 客户端信息字典 + self.providers = defaultdict(dict) # 服务提供端信息字典 + self.services = defaultdict(tuple) # 服务名称与提供者信息的映射 self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock.bind((host, port)) + self.udp_sock.settimeout(1) # 设置 UDP 套接字接收超时时间为 1 秒 self.running = True - print(f"协调服务器运行在 {host}:{port}") + self.executor = ThreadPoolExecutor(max_workers=10) # 创建线程池 + logger.debug(f"协调服务器运行在 {host}:{port}") + + # 启动定时清理任务 + self.executor.submit(self.cleanup_expired_services) def handle_register(self, data, addr): - """处理服务注册请求""" + """ + 处理服务注册请求。 + :param data: 包含服务注册信息的字典。 + :param addr: 发送请求的客户端地址。 + """ try: - service_name = data['service_name'] - internal_port = data['internal_port'] - client_id = data['client_id'] + provider_id = data['provider_id'] + services = data['services'] # 记录客户端信息 - self.clients[client_id] = { + self.providers[provider_id] = { 'addr': addr, - 'service_name': service_name, - 'internal_port': internal_port, + 'services': services, 'last_seen': time.time() } - # 注册服务 - self.services[service_name] = (addr, internal_port) - print(f"服务注册: {service_name} (端口:{internal_port}) 来自 {addr}") + # 遍历 services 字典,记录每个服务的名称和端口 + for service_name, service_port in services.items(): + self.services[service_name] = (addr, service_port, provider_id) - return {'status': 'success', 'message': '服务注册成功'} + logger.info(f"注册来自{addr}:{services}") + + # 直接回复注册成功消息 + response = {'status': 'success', 'message': '服务注册成功'} + self.udp_sock.sendto(json.dumps(response).encode(), addr) except Exception as e: - return {'status': 'error', 'message': str(e)} + response = {'status': 'error', 'message': str(e)} + self.udp_sock.sendto(json.dumps(response).encode(), addr) def handle_request(self, data, addr): - """处理服务请求""" + """ + 处理服务请求。 + :param data: 包含服务请求信息的字典。 + :param addr: 发送请求的客户端地址。 + """ try: service_name = data['service_name'] client_id = data['client_id'] if service_name not in self.services: - return {'status': 'error', 'message': '服务未找到'} + response = {'status': 'error', 'message': '服务未找到'} + self.udp_sock.sendto(json.dumps(response).encode(), addr) # 记录请求客户端信息 self.clients[client_id] = { @@ -55,55 +85,118 @@ class CoordinatorServer: } # 获取服务提供者的信息 - provider_addr, internal_port = self.services[service_name] - target_id = [ - client_id - for client_id, info in self.clients.items() - if info['service_name'] == service_name - ][0] + provider_addr, internal_port, provider_id = self.services[service_name] - print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}") + logger.debug(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}") - return { + response = { 'status': 'success', 'provider_addr': provider_addr, 'internal_port': internal_port, - 'target_id': target_id + 'provider_id': provider_id } + self.udp_sock.sendto(json.dumps(response).encode(), addr) except Exception as e: - return {'status': 'error', 'message': str(e)} + response = {'status': 'error', 'message': str(e)} + self.udp_sock.sendto(json.dumps(response).encode(), addr) def handle_punch_request(self, data, addr): - """处理打洞请求""" + """ + 处理打洞请求。 + :param data: 包含打洞请求信息的字典。 + :param addr: 发送请求的客户端地址。 + """ try: client_id = data['client_id'] - target_id = data['target_id'] + provider_id = data['provider_id'] # 获取目标客户端信息 - if target_id not in self.clients: - return {'status': 'error', 'message': '目标客户端未找到'} + if provider_id not in self.providers: + response = {'status': 'error', 'message': '目标提供端未找到'} + self.udp_sock.sendto(json.dumps(response).encode(), addr) - target_addr = self.clients[target_id]['addr'] + provider_addr = self.providers[provider_id]['addr'] - print(f"打洞请求: {addr} -> {target_addr}") + logger.info(f"打洞请求: {addr} -> {provider_id}") # 通知双方对方的地址 self.udp_sock.sendto(json.dumps({ 'action': 'punch_request', 'client_id': client_id, 'client_addr': addr - }).encode(), target_addr) + }).encode(), provider_addr) - return { + self.udp_sock.sendto(json.dumps({ 'status': 'success', - 'target_addr': target_addr - } + 'provider_addr': provider_addr + }).encode(), addr) except Exception as e: - return {'status': 'error', 'message': str(e)} + logger.error(f"处理打洞请求时出错: {e}") + response = {'status': 'error', 'message': str(e)} + self.udp_sock.sendto(json.dumps(response).encode(), addr) + + def handle_stop_provider(self, data, addr): + """ + 处理停止服务请求。 + :param data: 包含停止服务信息的字典。 + :param addr: 发送请求的客户端地址。 + """ + try: + service_name = data['service_name'] + self.services.pop(service_name, None) + response = {'status': 'success', 'message': '服务停止成功'} + self.udp_sock.sendto(json.dumps(response).encode(), addr) + except Exception as e: + response = {'status': 'error', 'message': str(e)} + self.udp_sock.sendto(json.dumps(response).encode(), addr) + + def handle_heartbeat(self, data, addr): + """ + 处理心跳包。 + :param data: 包含心跳信息的字典。 + :param addr: 发送心跳的客户端地址。 + """ + try: + provider_id = data['provider_id'] + self.providers[provider_id]['last_seen'] = time.time() + response = {'status': 'success', 'message': '心跳更新成功'} + self.udp_sock.sendto(json.dumps(response).encode(), addr) + except Exception as e: + response = {'status': 'error', 'message': str(e)} + self.udp_sock.sendto(json.dumps(response).encode(), addr) + + def cleanup_expired_services(self): + """ + 定时清理过期服务。 + 每20秒检查一次,移除超过30秒未更新心跳的服务。 + """ + while self.running: + time.sleep(60) # 每60秒检查一次 + current_time = time.time() + expired_providers = [] + for provider_id, provider in self.providers.items(): + if current_time - provider['last_seen'] > 60: # 心跳包最后更新时间大于30秒 + expired_providers.append(provider_id) + logger.info(f"服务过期: {provider['addr']}") + for provider_id in expired_providers: + provider = self.providers[provider_id] + for service_name in provider['services'].keys(): # 使用新的 services 字段 + self.services.pop(service_name, None) + self.providers.pop(provider_id, None) def run(self): - """运行协调服务器""" - print("协调服务器已启动,等待连接...") + """ + 运行协调服务器。 + """ + logger.info(f"协调服务器已启动,端口{self.udp_sock.getsockname()[1]},等待连接...") + action_handlers = { + 'register': self.handle_register, # 服务注册处理行为 + 'request': self.handle_request, # 服务请求处理行为 + 'punch_request': self.handle_punch_request, # 打洞处理行为 + 'stop_provider': self.handle_stop_provider, # 停止服务行为 + 'heartbeat': self.handle_heartbeat # 心跳处理行为 + } + while self.running: try: data, addr = self.udp_sock.recvfrom(4096) @@ -111,30 +204,56 @@ class CoordinatorServer: message = json.loads(data.decode()) action = message.get('action') - if action == 'register': - response = self.handle_register(message, addr) - elif action == 'request': - response = self.handle_request(message, addr) - elif action == 'punch_request': - response = self.handle_punch_request(message, addr) - elif action == 'stop_provider': - self.services.pop(message['service_name'], None) - response = {'status': 'success', 'message': '服务停止成功'} - else: - response = {'status': 'error', 'message': '无效操作'} - - self.udp_sock.sendto(json.dumps(response).encode(), addr) + handler = action_handlers.get(action) + if handler: # 如果存在对应的处理行为,则执行它 + self.executor.submit(handler, message, addr).result() + else: # 如果没有对应的处理行为,则返回错误响应 + self.udp_sock.sendto(json.dumps({ + 'status': 'error', + 'message': '无效操作'}).encode(), addr) except json.JSONDecodeError: self.udp_sock.sendto(json.dumps({ 'status': 'error', 'message': '无效的JSON数据' }).encode(), addr) + except socket.timeout: # 捕获 UDP 接收超时异常 + pass # 不做任何处理,允许主线程继续执行 except Exception as e: - print(f"服务器错误: {str(e)}") + logger.debug(f"服务器错误: {str(e)}") - self.udp_sock.close() + def start(self): + """ + 启动协调服务器。 + """ + # 创建线程运行 run 方法 + server_thread = threading.Thread(target=self.run) + server_thread.start() + + try: + # 主线程捕获键盘打断信号 + while self.running: + time.sleep(1) # 防止主线程空转 + except KeyboardInterrupt: + logger.info("检测到键盘打断,准备退出...") + self.running = False + + # 通知所有提供端停止服务 + for provider_id, provider_info in self.providers.items(): + try: + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_provider', + }).encode(), provider_info['addr']) + logger.info(f"已通知提供端 {provider_id} 停止服务") + except Exception as e: + logger.error(f"通知提供端 {provider_id} 停止服务时出错: {e}") + server_thread.join() # 等待服务器线程退出 + + # 关闭线程池和套接字 + self.executor.shutdown() + self.udp_sock.close() + logger.info("协调服务器已安全退出") if __name__ == '__main__': server = CoordinatorServer() - server.run() + server.start() diff --git a/provider.py b/provider.py new file mode 100644 index 0000000..e139334 --- /dev/null +++ b/provider.py @@ -0,0 +1,348 @@ +import socket +import json +import threading +import time +import uuid +from concurrent.futures import ThreadPoolExecutor +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +class ServiceProvider: + def __init__(self, coordinator_addr, services): + """ + 初始化服务提供者 + :param coordinator_addr: 协调服务器地址 (IP, port) + :param services: 提供的服务列表 {服务名: 端口号} + """ + self.provider_id = f"provider-{uuid.uuid4().hex[:8]}" + self.coordinator_addr = coordinator_addr + self.services = services + + # 创建UDP套接字用于协调通信 + self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.udp_sock.bind(('0.0.0.0', 0)) + self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) + + # 创建线程池用于处理UDP消息 + self.thread_pool = ThreadPoolExecutor(max_workers=10) + + self.clients = {} + # 存储活动连接 + self.active_connections = {} + self.running = True + + # 心跳线程 + self.heartbeat_thread = threading.Thread(target=self.send_heartbeat, daemon=True) + + def send_heartbeat(self): + """ + 发送心跳包到协调服务器 + """ + while self.running: + try: + message = { + 'action': 'heartbeat', + 'provider_id': self.provider_id + } + self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) + logger.debug(f"发送心跳包给协调服务器 {self.coordinator_addr}") + time.sleep(20) # 每20秒发送一次心跳包 + except Exception as e: + logger.error(f"发送心跳包失败: {str(e)}") + + def register_service(self): + """ + 向协调服务器注册服务 + :return: 注册是否成功 + """ + message = { + 'action': 'register', + 'services': self.services, + 'provider_id': self.provider_id + } + logger.info(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.services}'") + self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) + + # 等待响应 + try: + data, _ = self.udp_sock.recvfrom(4096) + response = json.loads(data.decode()) + if response['status'] == 'success': + logger.info(f"服务 '{self.services}' 注册成功") + return True + else: + logger.error(f"注册失败: {response['message']}") + return False + except Exception as e: + logger.error(f"注册服务时发生错误: {str(e)}") + return False + + def handle_punch(self, message, addr): + """ + 处理打洞请求 + :param message: 打洞请求消息 + :param addr: 客户端地址 + """ + self.udp_sock.sendto(json.dumps( + {'action': 'punch_response', + 'client_id': message['client_id'], + 'provider_id': self.provider_id + }).encode(), addr) + logger.debug(f"收到来自 {addr} 的打洞请求,已响应") + + def handle_punch_response(self, _, addr): + """ + 处理打洞响应 + :param addr: 客户端地址 + """ + logger.debug(f"收到来自 {addr} 的打洞响应") + + def handle_connect_request(self, message, addr): + """ + 处理连接请求 + :param message: 连接请求消息 + :param addr: 客户端地址 + """ + conn_id = message['conn_id'] + client_id = message['client_id'] + service_name = message['service_name'] + logger.debug(f"收到来自 {addr} 的连接请求") + threading.Thread( + target=self.handle_connection, + args=(conn_id, addr, client_id, service_name), + daemon=True + ).start() + + def handle_punch_request(self, message, _): + """ + 处理打洞请求 + :param message: 打洞请求消息 + """ + for i in range(10): + try: + self.udp_sock.sendto(json.dumps({ + 'action': 'punch', + 'client_id': message['client_id'], + 'provider_id': self.provider_id, + }).encode(), tuple(message['client_addr'])) + time.sleep(0.5) + except Exception as e: + logger.error(f"打洞失败: {str(e)}") + time.sleep(1) + + def handle_data(self, message, addr): + """ + 处理数据消息 + :param message: 数据消息 + :param addr: 客户端地址 + """ + conn_id = message['conn_id'] + data = bytes.fromhex(message['data']) + if conn_id in self.active_connections: + # 转发数据到本地服务 + logger.debug(f"收到来自 {addr} 的数据,转发到本地服务") + self.active_connections[conn_id]['local_sock'].sendall(data) + else: + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_conn', + 'conn_id': conn_id + }).encode(), addr) + logger.debug(f"收到来自 {addr} 的数据,但未找到对应的连接") + + def handle_stop_conn(self, message, _): + """ + 处理停止连接请求 + :param message: 停止连接请求消息 + """ + conn_id = message['conn_id'] + if conn_id in self.active_connections: + self.active_connections[conn_id]['local_sock'].close() + self.active_connections.pop(conn_id, None) + + def handle_stop_client(self, message, addr): + """ + 处理停止客户端请求 + :param message: 停止客户端请求消息 + :param addr: 客户端地址 + """ + client_id = message['client_id'] + for conn_id, conn_info in self.active_connections.items(): + if conn_info['client_id'] == client_id: + conn_info['local_sock'].close() + self.active_connections.pop(conn_id, None) + + def handle_stop_provider(self, message, _): + """ + 处理停止服务提供者请求 + :param message: 停止服务提供者请求消息 + """ + logger.info("收到停止服务提供者请求,正在关闭所有连接...") + for conn_id, conn_info in self.active_connections.items(): + conn_info['local_sock'].close() + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_conn', + 'conn_id': conn_id + }).encode(), conn_info['client_addr']) + self.active_connections.clear() + self.running = False + self.udp_sock.close() + self.thread_pool.shutdown(wait=True) + logger.info("服务提供者已停止") + + def handle_connection(self, conn_id, client_addr, client_id, service_name): + """ + 处理来自客户端的连接 + :param conn_id: 连接ID + :param client_addr: 客户端地址 + :param client_id: 客户端ID + :param service_name: 服务名称 + """ + internal_port = self.services[service_name] + try: + # 接受本地服务连接 + local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + local_sock.connect(('127.0.0.1', internal_port)) + if not local_sock: + logger.error("无法连接到本地服务") + return + + # 创建与客户端的UDP隧道 + logger.debug(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', internal_port)}") + + # 存储连接 + self.active_connections[conn_id] = { + 'local_sock': local_sock, + 'client_addr': client_addr, + 'client_id': client_id + } + + # 通知客户端连接就绪 + self.udp_sock.sendto(json.dumps({ + 'action': 'connected', + 'client_id': conn_id + }).encode(), client_addr) + + # 启动数据转发 + self.forward_data(conn_id, local_sock, client_addr) + except Exception as e: + logger.error(f"连接失败: {str(e)}") + self.udp_sock.sendto(json.dumps({ + 'action': 'connect_failed', + 'client_id': conn_id, + 'message': str(e) + }).encode(), client_addr) + + def forward_data(self, conn_id, local_sock, client_addr): + """ + 转发TCP数据到UDP隧道 + :param conn_id: 连接ID + :param local_sock: 本地服务套接字 + :param client_addr: 客户端地址 + """ + try: + while True: + # 从本地服务读取数据 + data = local_sock.recv(4096) + if not data: + break + + # 通过UDP发送给客户端 + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'conn_id': conn_id, + 'data': data.hex() # 十六进制编码二进制数据 + }).encode(), client_addr) + logger.debug(f"转发数据给客户端{client_addr}") + except Exception as e: + logger.error(f"转发数据失败: {str(e)}") + finally: + local_sock.close() + if conn_id in self.active_connections: + del self.active_connections[conn_id] + logger.debug(f"连接 {conn_id} 已关闭") + + def udp_listener(self): + """ + 监听UDP消息并处理 + """ + data = None + while self.running: + try: + data, addr = self.udp_sock.recvfrom(4096) + logger.debug(f"收到来自 {addr} 的消息: {data}") + message = json.loads(data.decode()) + + # 使用字典映射处理不同消息类型 + action_handlers = { + 'punch': self.handle_punch, + 'punch_check': self.handle_punch, + 'punch_response': self.handle_punch_response, + 'connect': self.handle_connect_request, + 'punch_request': self.handle_punch_request, + 'data': self.handle_data, + 'stop_conn': self.handle_stop_conn, + 'stop_client': self.handle_stop_client, + 'stop_provider': self.handle_stop_provider, + } + + # 提交任务到线程池 + if message.get('action') in action_handlers: + self.thread_pool.submit(action_handlers[message['action']], message, addr) + elif message.get('status') == 'error': + logger.error(f"来自 {addr} 的错误消息: {message}") + elif message.get('status') == 'success': + logger.debug(f"来自 {addr} 的成功消息: {message}") + else: + logger.warning(f"收到未知消息: {message}") + except Exception as e: + logger.error(f"处理UDP消息时发生错误: {str(e)}") + if data: + logger.error(f"无法处理消息: {data}") + + def run(self): + """ + 运行服务提供端 + """ + # 注册服务 + if not self.register_service(): + logger.error("服务注册失败,退出程序") + return + + # 启动UDP监听线程 + threading.Thread(target=self.udp_listener, daemon=True).start() + + # 启动心跳线程 + self.heartbeat_thread.start() + + # 保持主线程运行 + try: + while self.running: + time.sleep(1) + except KeyboardInterrupt: + self.running = False + self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr) + for conn_id, conn_info in self.active_connections.items(): + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_conn', + 'conn_id': conn_id + }).encode(), conn_info['client_addr']) + self.udp_sock.close() + logger.info("服务提供端已停止") + + +if __name__ == '__main__': + # 配置信息 + COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP + SERVICES = { + 'terraria-jk-2cxht5': 5001, + 'minecraft-jk-ytsvb54u6': 5002, + 'alist-jk-5shf43h6fdg': 5244, + 'ssh-jk-54htrsd324n6': 22 + } + + provider = ServiceProvider(COORDINATOR_ADDR, SERVICES) + provider.run() diff --git a/provider1.py b/provider1.py deleted file mode 100644 index 2517164..0000000 --- a/provider1.py +++ /dev/null @@ -1,220 +0,0 @@ -import socket -import json -import threading -import time -import uuid - - -class ServiceProvider: - def __init__(self, coordinator_addr, service_name, internal_port): - self.coordinator_addr = coordinator_addr - self.service_name = service_name - self.internal_port = internal_port - self.client_id = f"provider-{uuid.uuid4().hex[:8]}" - - # 创建UDP套接字用于协调通信 - self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) - - # # 创建TCP套接字用于服务监听 - # self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # self.tcp_sock.bind(('0.0.0.0', internal_port)) - # self.tcp_sock.listen(5) - print(f"服务端监听在端口 {internal_port}") - - # 存储活动连接 - self.active_connections = {} - self.running = True - - def register_service(self): - """向协调服务器注册服务""" - message = { - 'action': 'register', - 'service_name': self.service_name, - 'internal_port': self.internal_port, - 'external_port': self.udp_sock.getsockname()[1], - 'client_id': self.client_id - } - print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'") - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) - - # 等待响应 - data, _ = self.udp_sock.recvfrom(4096) - response = json.loads(data.decode()) - if response['status'] == 'success': - print(f"服务 '{self.service_name}' 注册成功") - else: - print(f"注册失败: {response['message']}") - - def udp_listener(self): - """监听UDP消息""" - while self.running: - try: - data, addr = self.udp_sock.recvfrom(4096) - print(f"收到来自 {addr} 的消息: {data.decode()}") - message = json.loads(data.decode()) - - if message.get('action') == 'punch' or message.get('action') == 'punch_check': - # 打洞请求 - 发送响应以确认连通性 - self.udp_sock.sendto(json.dumps( - {'action': 'punch_response', - 'client_id': message['client_id'] - }).encode(), addr) - print(f"收到来自 {addr} 的打洞请求,已响应") - elif message.get('action') == 'punch_response': - # 打洞响应 - 确认打洞成功 - print(f"收到来自 {addr} 的打洞响应") - elif message.get('action') == 'connect': - # 新的连接请求 - conn_id = message['conn_id'] - client_id = message['client_id'] - print(f"收到来自 {addr} 的连接请求") - threading.Thread( - target=self.handle_connection, - args=(conn_id, addr, client_id), - daemon=True - ).start() - elif message.get('action') == 'punch_request': - client_addr = message['client_addr'] - print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞") - self.punch_hole(message) - elif message.get('action') == 'data': - # 接收到来自客户端的数据 - conn_id = message['conn_id'] - data = bytes.fromhex(message['data']) - if conn_id in self.active_connections: - # 转发数据到本地服务 - print(f"收到来自 {addr} 的数据,转发到本地服务") - self.active_connections[conn_id]['local_sock'].sendall(data) - else: - print(f"收到来自 {addr} 的数据,但未找到对应的连接") - elif message.get('action') == 'stop_conn': - conn_id = message['conn_id'] - if conn_id in self.active_connections: - self.active_connections[conn_id]['local_sock'].close() - self.active_connections.pop(conn_id) - elif message.get('action') == 'stop_client': - client_id = message['client_id'] - for conn_id, conn_info in self.active_connections.items(): - if conn_info['client_id'] == client_id: - conn_info['local_sock'].close() - self.active_connections.pop(conn_id) - else: - print(f"收到未知消息: {message}") - except Exception as e: - print(e) - pass - - def handle_connection(self, conn_id, client_addr, client_id): - """处理来自客户端的连接""" - try: - # 接受本地服务连接 - local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - local_sock.connect(('127.0.0.1', self.internal_port)) - if not local_sock: - print("无法连接到本地服务") - return - - # 创建与客户端的UDP隧道 - print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}") - - # 存储连接 - self.active_connections[conn_id] = { - 'local_sock': local_sock, - 'client_addr': client_addr, - 'client_id': client_id - } - - # 通知客户端连接就绪 - self.udp_sock.sendto(json.dumps({ - 'action': 'connected', - 'client_id': conn_id - }).encode(), client_addr) - - # 启动数据转发 - self.forward_data(conn_id, local_sock, client_addr) - except Exception as e: - print(f"连接失败: {str(e)}") - self.udp_sock.sendto(json.dumps({ - 'action': 'connect_failed', - 'client_id': conn_id, - 'message': str(e) - }).encode(), client_addr) - - def forward_data(self, conn_id, local_sock, client_addr): - """转发TCP数据到UDP隧道""" - CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小 - try: - while True: - # 从本地服务读取数据 - data = local_sock.recv(65535) - if not data: - break - - # 通过UDP发送给客户端 - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': data.hex() # 十六进制编码二进制数据 - }).encode(), client_addr) - print(f"转发数据给客户端{client_addr}") - # for i in range(0, len(data), CHUNK_SIZE): - # chunk = data[i:i + CHUNK_SIZE] - # self.udp_sock.sendto(json.dumps({ - # 'action': 'data', - # 'conn_id': conn_id, - # 'data': chunk.hex() - # }).encode(), client_addr) - except Exception as e: - print(f"转发数据失败: {str(e)}") - finally: - local_sock.close() - if conn_id in self.active_connections: - del self.active_connections[conn_id] - print(f"连接 {conn_id} 已关闭") - - def run(self): - """运行服务提供端""" - # 注册服务 - self.register_service() - - # 启动UDP监听线程 - threading.Thread(target=self.udp_listener, daemon=True).start() - - # 保持主线程运行 - try: - while self.running: - time.sleep(1) - except KeyboardInterrupt: - self.running = False - self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr) - for conn_id, conn_info in self.active_connections.items(): - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': conn_id - }).encode(), conn_info['client_addr']) - self.udp_sock.close() - print("服务提供端已停止") - - def punch_hole(self, message): - for i in range(5): - try: - self.udp_sock.sendto(json.dumps({ - 'action': 'punch', - 'client_id': message['client_id'] - }).encode(), tuple(message['client_addr'])) - time.sleep(0.2) - except Exception as e: - print(f"打洞失败: {str(e)}") - time.sleep(1) - - -if __name__ == '__main__': - # 配置信息 - COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP - SERVICE_NAME = "my_game_server" - INTERNAL_PORT = 5001 # 内网游戏服务器端口 - - provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT) - provider.run()