diff --git a/connector1.py b/connector.py similarity index 100% rename from connector1.py rename to connector.py diff --git a/coordinator.py b/coordinator.py index 566aa32..e69de29 100644 --- a/coordinator.py +++ b/coordinator.py @@ -1,140 +0,0 @@ -import socket -import json -import time -from collections import defaultdict - - -class CoordinatorServer: - def __init__(self, host='0.0.0.0', port=5000): - self.host = host - self.port = port - self.services = {} # 服务名 -> (公网地址, 内网端口) - self.clients = defaultdict(dict) # 客户端ID -> 信息 - self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.udp_sock.bind((host, port)) - self.running = True - print(f"协调服务器运行在 {host}:{port}") - - def handle_register(self, data, addr): - """处理服务注册请求""" - try: - service_name = data['service_name'] - internal_port = data['internal_port'] - client_id = data['client_id'] - - # 记录客户端信息 - self.clients[client_id] = { - 'addr': addr, - 'service_name': service_name, - 'internal_port': internal_port, - 'last_seen': time.time() - } - - # 注册服务 - self.services[service_name] = (addr, internal_port) - print(f"服务注册: {service_name} (端口:{internal_port}) 来自 {addr}") - - return {'status': 'success', 'message': '服务注册成功'} - except Exception as e: - return {'status': 'error', 'message': str(e)} - - def handle_request(self, data, addr): - """处理服务请求""" - try: - service_name = data['service_name'] - client_id = data['client_id'] - - if service_name not in self.services: - return {'status': 'error', 'message': '服务未找到'} - - # 记录请求客户端信息 - self.clients[client_id] = { - 'addr': addr, - 'service_name': service_name, - 'last_seen': time.time() - } - - # 获取服务提供者的信息 - provider_addr, internal_port = self.services[service_name] - target_id = [ - client_id - for client_id, info in self.clients.items() - if info['service_name'] == service_name - ][0] - - print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}") - - return { - 'status': 'success', - 'provider_addr': provider_addr, - 'internal_port': internal_port, - 'target_id': target_id - } - except Exception as e: - return {'status': 'error', 'message': str(e)} - - def handle_punch_request(self, data, addr): - """处理打洞请求""" - try: - client_id = data['client_id'] - target_id = data['target_id'] - - # 获取目标客户端信息 - if target_id not in self.clients: - return {'status': 'error', 'message': '目标客户端未找到'} - - target_addr = self.clients[target_id]['addr'] - - print(f"打洞请求: {addr} -> {target_addr}") - - # 通知双方对方的地址 - self.udp_sock.sendto(json.dumps({ - 'action': 'punch_request', - 'client_id': client_id, - 'client_addr': addr - }).encode(), target_addr) - - return { - 'status': 'success', - 'target_addr': target_addr - } - except Exception as e: - return {'status': 'error', 'message': str(e)} - - def run(self): - """运行协调服务器""" - print("协调服务器已启动,等待连接...") - while self.running: - try: - data, addr = self.udp_sock.recvfrom(4096) - try: - message = json.loads(data.decode()) - action = message.get('action') - - if action == 'register': - response = self.handle_register(message, addr) - elif action == 'request': - response = self.handle_request(message, addr) - elif action == 'punch_request': - response = self.handle_punch_request(message, addr) - elif action == 'stop_provider': - self.services.pop(message['service_name'], None) - response = {'status': 'success', 'message': '服务停止成功'} - else: - response = {'status': 'error', 'message': '无效操作'} - - self.udp_sock.sendto(json.dumps(response).encode(), addr) - except json.JSONDecodeError: - self.udp_sock.sendto(json.dumps({ - 'status': 'error', - 'message': '无效的JSON数据' - }).encode(), addr) - except Exception as e: - print(f"服务器错误: {str(e)}") - - self.udp_sock.close() - - -if __name__ == '__main__': - server = CoordinatorServer() - server.run() diff --git a/provider.py b/provider.py new file mode 100644 index 0000000..e69de29 diff --git a/provider1.py b/provider1.py deleted file mode 100644 index 2517164..0000000 --- a/provider1.py +++ /dev/null @@ -1,220 +0,0 @@ -import socket -import json -import threading -import time -import uuid - - -class ServiceProvider: - def __init__(self, coordinator_addr, service_name, internal_port): - self.coordinator_addr = coordinator_addr - self.service_name = service_name - self.internal_port = internal_port - self.client_id = f"provider-{uuid.uuid4().hex[:8]}" - - # 创建UDP套接字用于协调通信 - self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) - - # # 创建TCP套接字用于服务监听 - # self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # self.tcp_sock.bind(('0.0.0.0', internal_port)) - # self.tcp_sock.listen(5) - print(f"服务端监听在端口 {internal_port}") - - # 存储活动连接 - self.active_connections = {} - self.running = True - - def register_service(self): - """向协调服务器注册服务""" - message = { - 'action': 'register', - 'service_name': self.service_name, - 'internal_port': self.internal_port, - 'external_port': self.udp_sock.getsockname()[1], - 'client_id': self.client_id - } - print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'") - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) - - # 等待响应 - data, _ = self.udp_sock.recvfrom(4096) - response = json.loads(data.decode()) - if response['status'] == 'success': - print(f"服务 '{self.service_name}' 注册成功") - else: - print(f"注册失败: {response['message']}") - - def udp_listener(self): - """监听UDP消息""" - while self.running: - try: - data, addr = self.udp_sock.recvfrom(4096) - print(f"收到来自 {addr} 的消息: {data.decode()}") - message = json.loads(data.decode()) - - if message.get('action') == 'punch' or message.get('action') == 'punch_check': - # 打洞请求 - 发送响应以确认连通性 - self.udp_sock.sendto(json.dumps( - {'action': 'punch_response', - 'client_id': message['client_id'] - }).encode(), addr) - print(f"收到来自 {addr} 的打洞请求,已响应") - elif message.get('action') == 'punch_response': - # 打洞响应 - 确认打洞成功 - print(f"收到来自 {addr} 的打洞响应") - elif message.get('action') == 'connect': - # 新的连接请求 - conn_id = message['conn_id'] - client_id = message['client_id'] - print(f"收到来自 {addr} 的连接请求") - threading.Thread( - target=self.handle_connection, - args=(conn_id, addr, client_id), - daemon=True - ).start() - elif message.get('action') == 'punch_request': - client_addr = message['client_addr'] - print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞") - self.punch_hole(message) - elif message.get('action') == 'data': - # 接收到来自客户端的数据 - conn_id = message['conn_id'] - data = bytes.fromhex(message['data']) - if conn_id in self.active_connections: - # 转发数据到本地服务 - print(f"收到来自 {addr} 的数据,转发到本地服务") - self.active_connections[conn_id]['local_sock'].sendall(data) - else: - print(f"收到来自 {addr} 的数据,但未找到对应的连接") - elif message.get('action') == 'stop_conn': - conn_id = message['conn_id'] - if conn_id in self.active_connections: - self.active_connections[conn_id]['local_sock'].close() - self.active_connections.pop(conn_id) - elif message.get('action') == 'stop_client': - client_id = message['client_id'] - for conn_id, conn_info in self.active_connections.items(): - if conn_info['client_id'] == client_id: - conn_info['local_sock'].close() - self.active_connections.pop(conn_id) - else: - print(f"收到未知消息: {message}") - except Exception as e: - print(e) - pass - - def handle_connection(self, conn_id, client_addr, client_id): - """处理来自客户端的连接""" - try: - # 接受本地服务连接 - local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - local_sock.connect(('127.0.0.1', self.internal_port)) - if not local_sock: - print("无法连接到本地服务") - return - - # 创建与客户端的UDP隧道 - print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}") - - # 存储连接 - self.active_connections[conn_id] = { - 'local_sock': local_sock, - 'client_addr': client_addr, - 'client_id': client_id - } - - # 通知客户端连接就绪 - self.udp_sock.sendto(json.dumps({ - 'action': 'connected', - 'client_id': conn_id - }).encode(), client_addr) - - # 启动数据转发 - self.forward_data(conn_id, local_sock, client_addr) - except Exception as e: - print(f"连接失败: {str(e)}") - self.udp_sock.sendto(json.dumps({ - 'action': 'connect_failed', - 'client_id': conn_id, - 'message': str(e) - }).encode(), client_addr) - - def forward_data(self, conn_id, local_sock, client_addr): - """转发TCP数据到UDP隧道""" - CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小 - try: - while True: - # 从本地服务读取数据 - data = local_sock.recv(65535) - if not data: - break - - # 通过UDP发送给客户端 - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': data.hex() # 十六进制编码二进制数据 - }).encode(), client_addr) - print(f"转发数据给客户端{client_addr}") - # for i in range(0, len(data), CHUNK_SIZE): - # chunk = data[i:i + CHUNK_SIZE] - # self.udp_sock.sendto(json.dumps({ - # 'action': 'data', - # 'conn_id': conn_id, - # 'data': chunk.hex() - # }).encode(), client_addr) - except Exception as e: - print(f"转发数据失败: {str(e)}") - finally: - local_sock.close() - if conn_id in self.active_connections: - del self.active_connections[conn_id] - print(f"连接 {conn_id} 已关闭") - - def run(self): - """运行服务提供端""" - # 注册服务 - self.register_service() - - # 启动UDP监听线程 - threading.Thread(target=self.udp_listener, daemon=True).start() - - # 保持主线程运行 - try: - while self.running: - time.sleep(1) - except KeyboardInterrupt: - self.running = False - self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr) - for conn_id, conn_info in self.active_connections.items(): - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': conn_id - }).encode(), conn_info['client_addr']) - self.udp_sock.close() - print("服务提供端已停止") - - def punch_hole(self, message): - for i in range(5): - try: - self.udp_sock.sendto(json.dumps({ - 'action': 'punch', - 'client_id': message['client_id'] - }).encode(), tuple(message['client_addr'])) - time.sleep(0.2) - except Exception as e: - print(f"打洞失败: {str(e)}") - time.sleep(1) - - -if __name__ == '__main__': - # 配置信息 - COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP - SERVICE_NAME = "my_game_server" - INTERNAL_PORT = 5001 # 内网游戏服务器端口 - - provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT) - provider.run()