diff --git a/connector.py b/connector.py index 9c5fa32..e69de29 100644 --- a/connector.py +++ b/connector.py @@ -1,232 +0,0 @@ -import socket -import json -import threading -import time -import uuid - - -class ServiceConnector: - def __init__(self, coordinator_addr, service_name, local_port): - self.target_id = None - self.coordinator_addr = coordinator_addr - self.service_name = service_name - self.local_port = local_port - self.client_id = f"connector-{uuid.uuid4().hex[:8]}" - - # 创建UDP套接字用于协调通信 - self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) - - # 创建TCP套接字用于本地监听 - self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.tcp_sock.bind(('127.0.0.1', local_port)) - self.tcp_sock.listen(5) - print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'") - - # 存储活动连接 - self.active_connections = {} - self.provider_addr = None - self.internal_port = None - self.running = True - - def request_service(self): - """向协调服务器请求服务""" - message = { - 'action': 'request', - 'service_name': self.service_name, - 'client_id': self.client_id - } - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) - - # 等待响应 - data, _ = self.udp_sock.recvfrom(4096) - response = json.loads(data.decode()) - if response['status'] == 'success': - self.provider_addr = tuple(response['provider_addr']) - self.internal_port = response['internal_port'] - self.target_id = response['target_id'] - print(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}") - return True - else: - print(f"服务请求失败: {response['message']}") - return False - - def punch_hole(self): - """执行UDP打洞""" - if not self.provider_addr: - return False - - # 请求打洞 - message = { - 'action': 'punch_request', - 'client_id': self.client_id, - 'target_id': self.target_id - } - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) - - # 等待协调服务器响应 - data, _ = self.udp_sock.recvfrom(4096) - response = json.loads(data.decode()) - if response['status'] != 'success': - print(f"打洞请求失败: {response['message']}") - return False - - # 向服务提供者发送打洞包 - print(f"尝试打洞到 {self.provider_addr}...") - for _ in range(5): - self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr) - time.sleep(0.5) - - # 检查连通性 - self.udp_sock.settimeout(10.0) - try: - self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr) - data, addr = self.udp_sock.recvfrom(1024) - if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr: - print("打洞成功! 已建立UDP连接") - return True - else: - print(f"错误的打洞响应{data}") - return False - except socket.timeout: - print("打洞失败: 未收到响应") - return False - finally: - self.udp_sock.settimeout(None) - - def udp_listener(self): - """监听UDP消息""" - while self.running: - try: - data, addr = self.udp_sock.recvfrom(65535) - message = json.loads(data.decode()) - - if message['action'] == 'punch_response': - # 打洞响应 - 确认连通性 - print(f"收到来自 {addr} 的打洞响应") - elif message['action'] == 'data': - if message['conn_id'] in self.active_connections: - # 转发数据到本地客户端 - print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}") - self.active_connections[message['conn_id']].send(bytes.fromhex(message['data'])) - else: - print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接") - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': message['conn_id'] - }).encode(), addr) - elif message['action'] == 'stop_conn': - # 停止连接 - if message['conn_id'] in self.active_connections: - self.active_connections[message['conn_id']].close() - del self.active_connections[message['conn_id']] - print(f"已关闭本地连接 {message['conn_id']}") - else: - print(f"收到未知消息: {message}") - except Exception as e: - print(e) - - def tcp_listener(self): - """监听本地TCP连接""" - while self.running: - try: - client_sock, client_addr = self.tcp_sock.accept() - print(f"新的本地连接来自 {client_addr}") - - # 为每个连接生成唯一ID - conn_id = str(uuid.uuid4()) - - # 存储连接 - self.active_connections[conn_id] = client_sock - - # 请求服务提供者建立连接 - self.udp_sock.sendto(json.dumps({ - 'action': 'connect', - 'client_id': self.client_id, - 'conn_id': conn_id - }).encode(), self.provider_addr) - - time.sleep(0.5) - - # 启动数据转发线程 - threading.Thread( - target=self.forward_data, - args=(conn_id, client_sock), - daemon=True - ).start() - - except: - pass - - def forward_data(self, conn_id, client_sock): - """转发本地TCP数据到UDP隧道""" - CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小 - try: - while True: - # 从本地客户端读取数据 - data = client_sock.recv(65535) - if not data: - break - - # 通过UDP发送给服务提供者 - print(f"发送数据到服务提供者: {self.provider_addr}") - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': data.hex() # 十六进制编码二进制数据 - }).encode(), self.provider_addr) - # 分片发送数据 - # for i in range(0, len(data), CHUNK_SIZE): - # chunk = data[i:i + CHUNK_SIZE] - # self.udp_sock.sendto(json.dumps({ - # 'action': 'data', - # 'conn_id': conn_id, - # 'data': chunk.hex() - # }).encode(), self.provider_addr) - except: - pass - finally: - client_sock.close() - if conn_id in self.active_connections: - del self.active_connections[conn_id] - - def run(self): - """运行服务连接端""" - # 请求服务 - if not self.request_service(): - return - - # 执行打洞 - if not self.punch_hole(): - return - - # 启动UDP监听线程 - threading.Thread(target=self.udp_listener, daemon=True).start() - - # 启动TCP监听线程 - threading.Thread(target=self.tcp_listener, daemon=True).start() - - # 保持主线程运行 - try: - while self.running: - time.sleep(1) - except KeyboardInterrupt: - self.running = False - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_client', - 'client_id': self.client_id - }).encode(), self.provider_addr) - self.udp_sock.close() - self.tcp_sock.close() - print("服务连接端已停止") - - -if __name__ == '__main__': - # 配置信息 - COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP - SERVICE_NAME = "my_game_server" - LOCAL_PORT = 12345 # 本地映射端口 - - connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT) - connector.run()