diff --git a/connector1.py b/connector1.py index 373f70a..9c5fa32 100644 --- a/connector1.py +++ b/connector1.py @@ -79,7 +79,7 @@ class ServiceConnector: time.sleep(0.5) # 检查连通性 - self.udp_sock.settimeout(2.0) + self.udp_sock.settimeout(10.0) try: self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr) data, addr = self.udp_sock.recvfrom(1024) @@ -99,7 +99,7 @@ class ServiceConnector: """监听UDP消息""" while self.running: try: - data, addr = self.udp_sock.recvfrom(4096) + data, addr = self.udp_sock.recvfrom(65535) message = json.loads(data.decode()) if message['action'] == 'punch_response': @@ -108,9 +108,22 @@ class ServiceConnector: elif message['action'] == 'data': if message['conn_id'] in self.active_connections: # 转发数据到本地客户端 - self.active_connections[message['conn_id']].sendall(bytes.fromhex(message['data'])) + print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}") + self.active_connections[message['conn_id']].send(bytes.fromhex(message['data'])) else: print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接") + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_conn', + 'conn_id': message['conn_id'] + }).encode(), addr) + elif message['action'] == 'stop_conn': + # 停止连接 + if message['conn_id'] in self.active_connections: + self.active_connections[message['conn_id']].close() + del self.active_connections[message['conn_id']] + print(f"已关闭本地连接 {message['conn_id']}") + else: + print(f"收到未知消息: {message}") except Exception as e: print(e) @@ -130,6 +143,7 @@ class ServiceConnector: # 请求服务提供者建立连接 self.udp_sock.sendto(json.dumps({ 'action': 'connect', + 'client_id': self.client_id, 'conn_id': conn_id }).encode(), self.provider_addr) @@ -147,28 +161,29 @@ class ServiceConnector: def forward_data(self, conn_id, client_sock): """转发本地TCP数据到UDP隧道""" - CHUNK_SIZE = 1472 # 根据 MTU 调整最大分片大小 + CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小 try: while True: # 从本地客户端读取数据 - data = client_sock.recv(4096) + data = client_sock.recv(65535) if not data: break - # # 通过UDP发送给服务提供者 - # self.udp_sock.sendto(json.dumps({ - # 'action': 'data', - # 'conn_id': conn_id, - # 'data': data.hex() # 十六进制编码二进制数据 - # }).encode(), self.provider_addr) + # 通过UDP发送给服务提供者 + print(f"发送数据到服务提供者: {self.provider_addr}") + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'conn_id': conn_id, + 'data': data.hex() # 十六进制编码二进制数据 + }).encode(), self.provider_addr) # 分片发送数据 - for i in range(0, len(data), CHUNK_SIZE): - chunk = data[i:i + CHUNK_SIZE] - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': chunk.hex() - }).encode(), self.provider_addr) + # for i in range(0, len(data), CHUNK_SIZE): + # chunk = data[i:i + CHUNK_SIZE] + # self.udp_sock.sendto(json.dumps({ + # 'action': 'data', + # 'conn_id': conn_id, + # 'data': chunk.hex() + # }).encode(), self.provider_addr) except: pass finally: @@ -198,6 +213,10 @@ class ServiceConnector: time.sleep(1) except KeyboardInterrupt: self.running = False + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_client', + 'client_id': self.client_id + }).encode(), self.provider_addr) self.udp_sock.close() self.tcp_sock.close() print("服务连接端已停止") diff --git a/coordinator.py b/coordinator.py index 9f2fa8c..566aa32 100644 --- a/coordinator.py +++ b/coordinator.py @@ -117,6 +117,9 @@ class CoordinatorServer: response = self.handle_request(message, addr) elif action == 'punch_request': response = self.handle_punch_request(message, addr) + elif action == 'stop_provider': + self.services.pop(message['service_name'], None) + response = {'status': 'success', 'message': '服务停止成功'} else: response = {'status': 'error', 'message': '无效操作'} diff --git a/provider1.py b/provider1.py index 9baa207..2517164 100644 --- a/provider1.py +++ b/provider1.py @@ -68,10 +68,11 @@ class ServiceProvider: elif message.get('action') == 'connect': # 新的连接请求 conn_id = message['conn_id'] + client_id = message['client_id'] print(f"收到来自 {addr} 的连接请求") threading.Thread( target=self.handle_connection, - args=(conn_id, addr), + args=(conn_id, addr, client_id), daemon=True ).start() elif message.get('action') == 'punch_request': @@ -84,17 +85,28 @@ class ServiceProvider: data = bytes.fromhex(message['data']) if conn_id in self.active_connections: # 转发数据到本地服务 - print(f"收到来自 {addr} 的数据{data}\n,转发到本地服务") + print(f"收到来自 {addr} 的数据,转发到本地服务") self.active_connections[conn_id]['local_sock'].sendall(data) else: print(f"收到来自 {addr} 的数据,但未找到对应的连接") + elif message.get('action') == 'stop_conn': + conn_id = message['conn_id'] + if conn_id in self.active_connections: + self.active_connections[conn_id]['local_sock'].close() + self.active_connections.pop(conn_id) + elif message.get('action') == 'stop_client': + client_id = message['client_id'] + for conn_id, conn_info in self.active_connections.items(): + if conn_info['client_id'] == client_id: + conn_info['local_sock'].close() + self.active_connections.pop(conn_id) else: print(f"收到未知消息: {message}") except Exception as e: print(e) pass - def handle_connection(self, conn_id, client_addr): + def handle_connection(self, conn_id, client_addr, client_id): """处理来自客户端的连接""" try: # 接受本地服务连接 @@ -110,7 +122,8 @@ class ServiceProvider: # 存储连接 self.active_connections[conn_id] = { 'local_sock': local_sock, - 'client_addr': client_addr + 'client_addr': client_addr, + 'client_id': client_id } # 通知客户端连接就绪 @@ -131,27 +144,28 @@ class ServiceProvider: def forward_data(self, conn_id, local_sock, client_addr): """转发TCP数据到UDP隧道""" - CHUNK_SIZE = 1472 # 根据 MTU 调整最大分片大小 + CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小 try: while True: # 从本地服务读取数据 - data = local_sock.recv(4096) + data = local_sock.recv(65535) if not data: break - # # 通过UDP发送给客户端 - # self.udp_sock.sendto(json.dumps({ - # 'action': 'data', - # 'conn_id': conn_id, - # 'data': data.hex() # 十六进制编码二进制数据 - # }).encode(), client_addr) - for i in range(0, len(data), CHUNK_SIZE): - chunk = data[i:i + CHUNK_SIZE] - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': chunk.hex() - }).encode(), client_addr) + # 通过UDP发送给客户端 + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'conn_id': conn_id, + 'data': data.hex() # 十六进制编码二进制数据 + }).encode(), client_addr) + print(f"转发数据给客户端{client_addr}") + # for i in range(0, len(data), CHUNK_SIZE): + # chunk = data[i:i + CHUNK_SIZE] + # self.udp_sock.sendto(json.dumps({ + # 'action': 'data', + # 'conn_id': conn_id, + # 'data': chunk.hex() + # }).encode(), client_addr) except Exception as e: print(f"转发数据失败: {str(e)}") finally: @@ -174,6 +188,12 @@ class ServiceProvider: time.sleep(1) except KeyboardInterrupt: self.running = False + self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr) + for conn_id, conn_info in self.active_connections.items(): + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_conn', + 'conn_id': conn_id + }).encode(), conn_info['client_addr']) self.udp_sock.close() print("服务提供端已停止") @@ -184,7 +204,7 @@ class ServiceProvider: 'action': 'punch', 'client_id': message['client_id'] }).encode(), tuple(message['client_addr'])) - time.sleep(2) + time.sleep(0.2) except Exception as e: print(f"打洞失败: {str(e)}") time.sleep(1) @@ -192,9 +212,9 @@ class ServiceProvider: if __name__ == '__main__': # 配置信息 - COORDINATOR_ADDR = ('127.0.0.1', 5000) # 替换为公网服务器IP + COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP SERVICE_NAME = "my_game_server" - INTERNAL_PORT = 8888 # 内网游戏服务器端口 + INTERNAL_PORT = 5001 # 内网游戏服务器端口 provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT) provider.run()