import socket import json import threading import time import uuid class ServiceConnector: def __init__(self, coordinator_addr, service_name, local_port): self.target_id = None self.coordinator_addr = coordinator_addr self.service_name = service_name self.local_port = local_port self.client_id = f"connector-{uuid.uuid4().hex[:8]}" # 创建UDP套接字用于协调通信 self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock.bind(('0.0.0.0', 0)) self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) # 创建TCP套接字用于本地监听 self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcp_sock.bind(('127.0.0.1', local_port)) self.tcp_sock.listen(5) print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'") # 存储活动连接 self.active_connections = {} self.provider_addr = None self.internal_port = None self.running = True def request_service(self): """向协调服务器请求服务""" message = { 'action': 'request', 'service_name': self.service_name, 'client_id': self.client_id } self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) # 等待响应 data, _ = self.udp_sock.recvfrom(4096) response = json.loads(data.decode()) if response['status'] == 'success': self.provider_addr = tuple(response['provider_addr']) self.internal_port = response['internal_port'] self.target_id = response['target_id'] print(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}") return True else: print(f"服务请求失败: {response['message']}") return False def punch_hole(self): """执行UDP打洞""" if not self.provider_addr: return False # 请求打洞 message = { 'action': 'punch_request', 'client_id': self.client_id, 'target_id': self.target_id } self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) # 等待协调服务器响应 data, _ = self.udp_sock.recvfrom(4096) response = json.loads(data.decode()) if response['status'] != 'success': print(f"打洞请求失败: {response['message']}") return False # 向服务提供者发送打洞包 print(f"尝试打洞到 {self.provider_addr}...") for _ in range(5): self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr) time.sleep(0.5) # 检查连通性 self.udp_sock.settimeout(2.0) try: self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr) data, addr = self.udp_sock.recvfrom(1024) if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr: print("打洞成功! 已建立UDP连接") return True else: print(f"错误的打洞响应{data}") return False except socket.timeout: print("打洞失败: 未收到响应") return False finally: self.udp_sock.settimeout(None) def udp_listener(self): """监听UDP消息""" while self.running: try: data, addr = self.udp_sock.recvfrom(4096) message = json.loads(data.decode()) if message['action'] == 'punch_response': # 打洞响应 - 确认连通性 print(f"收到来自 {addr} 的打洞响应") elif message['action'] == 'data': if message['conn_id'] in self.active_connections: # 转发数据到本地客户端 self.active_connections[message['conn_id']].sendall(bytes.fromhex(message['data'])) else: print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接") except Exception as e: print(e) def tcp_listener(self): """监听本地TCP连接""" while self.running: try: client_sock, client_addr = self.tcp_sock.accept() print(f"新的本地连接来自 {client_addr}") # 为每个连接生成唯一ID conn_id = str(uuid.uuid4()) # 存储连接 self.active_connections[conn_id] = client_sock # 请求服务提供者建立连接 self.udp_sock.sendto(json.dumps({ 'action': 'connect', 'conn_id': conn_id }).encode(), self.provider_addr) time.sleep(0.5) # 启动数据转发线程 threading.Thread( target=self.forward_data, args=(conn_id, client_sock), daemon=True ).start() except: pass def forward_data(self, conn_id, client_sock): """转发本地TCP数据到UDP隧道""" CHUNK_SIZE = 1472 # 根据 MTU 调整最大分片大小 try: while True: # 从本地客户端读取数据 data = client_sock.recv(4096) if not data: break # # 通过UDP发送给服务提供者 # self.udp_sock.sendto(json.dumps({ # 'action': 'data', # 'conn_id': conn_id, # 'data': data.hex() # 十六进制编码二进制数据 # }).encode(), self.provider_addr) # 分片发送数据 for i in range(0, len(data), CHUNK_SIZE): chunk = data[i:i + CHUNK_SIZE] self.udp_sock.sendto(json.dumps({ 'action': 'data', 'conn_id': conn_id, 'data': chunk.hex() }).encode(), self.provider_addr) except: pass finally: client_sock.close() if conn_id in self.active_connections: del self.active_connections[conn_id] def run(self): """运行服务连接端""" # 请求服务 if not self.request_service(): return # 执行打洞 if not self.punch_hole(): return # 启动UDP监听线程 threading.Thread(target=self.udp_listener, daemon=True).start() # 启动TCP监听线程 threading.Thread(target=self.tcp_listener, daemon=True).start() # 保持主线程运行 try: while self.running: time.sleep(1) except KeyboardInterrupt: self.running = False self.udp_sock.close() self.tcp_sock.close() print("服务连接端已停止") if __name__ == '__main__': # 配置信息 COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP SERVICE_NAME = "my_game_server" LOCAL_PORT = 12345 # 本地映射端口 connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT) connector.run()