import socket import json import threading import time import uuid class ServiceProvider: def __init__(self, coordinator_addr, service_name, internal_port): self.coordinator_addr = coordinator_addr self.service_name = service_name self.internal_port = internal_port self.client_id = f"provider-{uuid.uuid4().hex[:8]}" # 创建UDP套接字用于协调通信 self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock.bind(('0.0.0.0', 0)) self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) # # 创建TCP套接字用于服务监听 # self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # self.tcp_sock.bind(('0.0.0.0', internal_port)) # self.tcp_sock.listen(5) print(f"服务端监听在端口 {internal_port}") # 存储活动连接 self.active_connections = {} self.running = True def register_service(self): """向协调服务器注册服务""" message = { 'action': 'register', 'service_name': self.service_name, 'internal_port': self.internal_port, 'external_port': self.udp_sock.getsockname()[1], 'client_id': self.client_id } print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'") self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) # 等待响应 data, _ = self.udp_sock.recvfrom(4096) response = json.loads(data.decode()) if response['status'] == 'success': print(f"服务 '{self.service_name}' 注册成功") else: print(f"注册失败: {response['message']}") def udp_listener(self): """监听UDP消息""" while self.running: try: data, addr = self.udp_sock.recvfrom(4096) print(f"收到来自 {addr} 的消息: {data.decode()}") message = json.loads(data.decode()) if message.get('action') == 'punch' or message.get('action') == 'punch_check': # 打洞请求 - 发送响应以确认连通性 self.udp_sock.sendto(json.dumps( {'action': 'punch_response', 'client_id': message['client_id'] }).encode(), addr) print(f"收到来自 {addr} 的打洞请求,已响应") elif message.get('action') == 'punch_response': # 打洞响应 - 确认打洞成功 print(f"收到来自 {addr} 的打洞响应") elif message.get('action') == 'connect': # 新的连接请求 conn_id = message['conn_id'] client_id = message['client_id'] print(f"收到来自 {addr} 的连接请求") threading.Thread( target=self.handle_connection, args=(conn_id, addr, client_id), daemon=True ).start() elif message.get('action') == 'punch_request': client_addr = message['client_addr'] print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞") self.punch_hole(message) elif message.get('action') == 'data': # 接收到来自客户端的数据 conn_id = message['conn_id'] data = bytes.fromhex(message['data']) if conn_id in self.active_connections: # 转发数据到本地服务 print(f"收到来自 {addr} 的数据,转发到本地服务") self.active_connections[conn_id]['local_sock'].sendall(data) else: print(f"收到来自 {addr} 的数据,但未找到对应的连接") elif message.get('action') == 'stop_conn': conn_id = message['conn_id'] if conn_id in self.active_connections: self.active_connections[conn_id]['local_sock'].close() self.active_connections.pop(conn_id) elif message.get('action') == 'stop_client': client_id = message['client_id'] for conn_id, conn_info in self.active_connections.items(): if conn_info['client_id'] == client_id: conn_info['local_sock'].close() self.active_connections.pop(conn_id) else: print(f"收到未知消息: {message}") except Exception as e: print(e) pass def handle_connection(self, conn_id, client_addr, client_id): """处理来自客户端的连接""" try: # 接受本地服务连接 local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) local_sock.connect(('127.0.0.1', self.internal_port)) if not local_sock: print("无法连接到本地服务") return # 创建与客户端的UDP隧道 print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}") # 存储连接 self.active_connections[conn_id] = { 'local_sock': local_sock, 'client_addr': client_addr, 'client_id': client_id } # 通知客户端连接就绪 self.udp_sock.sendto(json.dumps({ 'action': 'connected', 'client_id': conn_id }).encode(), client_addr) # 启动数据转发 self.forward_data(conn_id, local_sock, client_addr) except Exception as e: print(f"连接失败: {str(e)}") self.udp_sock.sendto(json.dumps({ 'action': 'connect_failed', 'client_id': conn_id, 'message': str(e) }).encode(), client_addr) def forward_data(self, conn_id, local_sock, client_addr): """转发TCP数据到UDP隧道""" CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小 try: while True: # 从本地服务读取数据 data = local_sock.recv(65535) if not data: break # 通过UDP发送给客户端 self.udp_sock.sendto(json.dumps({ 'action': 'data', 'conn_id': conn_id, 'data': data.hex() # 十六进制编码二进制数据 }).encode(), client_addr) print(f"转发数据给客户端{client_addr}") # for i in range(0, len(data), CHUNK_SIZE): # chunk = data[i:i + CHUNK_SIZE] # self.udp_sock.sendto(json.dumps({ # 'action': 'data', # 'conn_id': conn_id, # 'data': chunk.hex() # }).encode(), client_addr) except Exception as e: print(f"转发数据失败: {str(e)}") finally: local_sock.close() if conn_id in self.active_connections: del self.active_connections[conn_id] print(f"连接 {conn_id} 已关闭") def run(self): """运行服务提供端""" # 注册服务 self.register_service() # 启动UDP监听线程 threading.Thread(target=self.udp_listener, daemon=True).start() # 保持主线程运行 try: while self.running: time.sleep(1) except KeyboardInterrupt: self.running = False self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr) for conn_id, conn_info in self.active_connections.items(): self.udp_sock.sendto(json.dumps({ 'action': 'stop_conn', 'conn_id': conn_id }).encode(), conn_info['client_addr']) self.udp_sock.close() print("服务提供端已停止") def punch_hole(self, message): for i in range(5): try: self.udp_sock.sendto(json.dumps({ 'action': 'punch', 'client_id': message['client_id'] }).encode(), tuple(message['client_addr'])) time.sleep(0.2) except Exception as e: print(f"打洞失败: {str(e)}") time.sleep(1) if __name__ == '__main__': # 配置信息 COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP SERVICE_NAME = "my_game_server" INTERNAL_PORT = 5001 # 内网游戏服务器端口 provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT) provider.run()