diff --git a/connector1.py b/connector1.py index 9c5fa32..769eb29 100644 --- a/connector1.py +++ b/connector1.py @@ -1,33 +1,142 @@ +# connector.py import socket import json import threading import time import uuid +import heapq +from collections import deque + + +class ReliableChannel: + """可靠传输通道类(与provider中的类似)""" + + def __init__(self, udp_sock, remote_addr): + self.udp_sock = udp_sock + self.remote_addr = remote_addr + self.send_buffer = deque() + self.recv_buffer = {} + self.expected_seq = 0 + self.last_ack = -1 + self.ack_interval = 0.2 + self.last_ack_time = 0 + self.window_size = 10 + self.retransmit_timers = {} + self.retransmit_timeout = 0.5 + self.running = True + + def send(self, data, is_control=False): + if not self.running: + return + + seq = self.expected_seq + packet = { + 'seq': seq, + 'data': data.hex(), + 'is_control': is_control, + 'timestamp': time.time() + } + + self.send_buffer.append(packet) + self.retransmit_timers[seq] = time.time() + self.expected_seq += 1 + self._send_from_buffer() + + def _send_from_buffer(self): + while self.send_buffer and self.send_buffer[0]['seq'] <= self.last_ack: + self.send_buffer.popleft() + + for packet in list(self.send_buffer)[:self.window_size]: + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'packet': packet + }).encode(), self.remote_addr) + + def process_ack(self, ack_seq): + if ack_seq > self.last_ack: + self.last_ack = ack_seq + for seq in list(self.retransmit_timers.keys()): + if seq <= ack_seq: + self.retransmit_timers.pop(seq, None) + self._send_from_buffer() + + def process_packet(self, packet): + seq = packet['seq'] + data = bytes.fromhex(packet['data']) + + if time.time() - self.last_ack_time > self.ack_interval: + self.send_ack() + self.last_ack_time = time.time() + + if seq == self.expected_seq: + self.expected_seq += 1 + self._process_buffered() + return data + elif seq > self.expected_seq: + self.recv_buffer[seq] = data + return None + else: + return None + + def _process_buffered(self): + while self.expected_seq in self.recv_buffer: + data = self.recv_buffer.pop(self.expected_seq) + self.expected_seq += 1 + # TODO: 返回数据或交给上层处理 + + def send_ack(self): + ack_packet = { + 'action': 'ack', + 'ack_seq': self.expected_seq - 1, + 'window': self.window_size + } + self.udp_sock.sendto(json.dumps(ack_packet).encode(), self.remote_addr) + + def check_retransmit(self): + now = time.time() + for seq, send_time in list(self.retransmit_timers.items()): + if now - send_time > self.retransmit_timeout: + for packet in self.send_buffer: + if packet['seq'] == seq: + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'packet': packet + }).encode(), self.remote_addr) + self.retransmit_timers[seq] = now + break + + def close(self): + self.running = False + self.send_buffer.clear() + self.recv_buffer.clear() + self.retransmit_timers.clear() class ServiceConnector: def __init__(self, coordinator_addr, service_name, local_port): - self.target_id = None self.coordinator_addr = coordinator_addr self.service_name = service_name self.local_port = local_port self.client_id = f"connector-{uuid.uuid4().hex[:8]}" - # 创建UDP套接字用于协调通信 + # 创建UDP套接字 self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) + self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB接收缓冲区 + self.udp_sock.settimeout(0.1) - # 创建TCP套接字用于本地监听 + # 创建TCP监听套接字 self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcp_sock.bind(('127.0.0.1', local_port)) - self.tcp_sock.listen(5) + self.tcp_sock.listen(10) # 增加监听队列 print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'") - # 存储活动连接 + # 连接状态 self.active_connections = {} + self.reliable_channels = {} self.provider_addr = None self.internal_port = None + self.target_id = None self.running = True def request_service(self): @@ -54,9 +163,6 @@ class ServiceConnector: def punch_hole(self): """执行UDP打洞""" - if not self.provider_addr: - return False - # 请求打洞 message = { 'action': 'punch_request', @@ -74,70 +180,99 @@ class ServiceConnector: # 向服务提供者发送打洞包 print(f"尝试打洞到 {self.provider_addr}...") - for _ in range(5): - self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr) - time.sleep(0.5) + for _ in range(10): # 增加打洞尝试次数 + self.udp_sock.sendto(b'PUNCH', self.provider_addr) # 发送原始UDP包 + time.sleep(0.2) # 检查连通性 - self.udp_sock.settimeout(10.0) + self.udp_sock.settimeout(5.0) try: self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr) - data, addr = self.udp_sock.recvfrom(1024) - if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr: + data, addr = self.udp_sock.recvfrom(4096) + if addr == self.provider_addr: print("打洞成功! 已建立UDP连接") return True else: - print(f"错误的打洞响应{data}") + print(f"错误的响应来源: {addr}") return False except socket.timeout: print("打洞失败: 未收到响应") return False finally: - self.udp_sock.settimeout(None) + self.udp_sock.settimeout(0.1) def udp_listener(self): """监听UDP消息""" while self.running: try: data, addr = self.udp_sock.recvfrom(65535) - message = json.loads(data.decode()) + try: + message = json.loads(data.decode()) + action = message.get('action') - if message['action'] == 'punch_response': - # 打洞响应 - 确认连通性 - print(f"收到来自 {addr} 的打洞响应") - elif message['action'] == 'data': - if message['conn_id'] in self.active_connections: - # 转发数据到本地客户端 - print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}") - self.active_connections[message['conn_id']].send(bytes.fromhex(message['data'])) - else: - print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接") - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': message['conn_id'] - }).encode(), addr) - elif message['action'] == 'stop_conn': - # 停止连接 - if message['conn_id'] in self.active_connections: - self.active_connections[message['conn_id']].close() - del self.active_connections[message['conn_id']] - print(f"已关闭本地连接 {message['conn_id']}") - else: - print(f"收到未知消息: {message}") + if action == 'punch_response': + # 打洞响应 + pass + elif action == 'connected': + # 连接建立成功 + conn_id = message['conn_id'] + if conn_id in self.active_connections: + # 创建可靠通道 + channel = ReliableChannel(self.udp_sock, self.provider_addr) + self.reliable_channels[conn_id] = channel + + # 启动通道监控 + threading.Thread( + target=self.monitor_channel, + args=(conn_id, channel), + daemon=True + ).start() + elif action == 'data': + # 处理数据包 + conn_id = message.get('conn_id') + if conn_id and conn_id in self.reliable_channels: + channel = self.reliable_channels[conn_id] + packet = message['packet'] + + # 处理数据包 + data_chunk = channel.process_packet(packet) + if data_chunk and conn_id in self.active_connections: + # 转发数据到本地客户端 + self.active_connections[conn_id].send(data_chunk) + elif action == 'ack': + # 处理ACK确认 + conn_id = message.get('conn_id') + if conn_id and conn_id in self.reliable_channels: + channel = self.reliable_channels[conn_id] + ack_seq = message['ack_seq'] + channel.process_ack(ack_seq) + elif action == 'stop_conn': + conn_id = message['conn_id'] + self.close_connection(conn_id) + elif action == 'connect_failed': + conn_id = message['conn_id'] + print(f"连接失败: {message['message']}") + self.close_connection(conn_id) + except json.JSONDecodeError: + # 原始UDP包可能是打洞确认 + pass + except Exception as e: + print(f"UDP监听错误: {str(e)}") + except socket.timeout: + pass except Exception as e: - print(e) + print(f"UDP监听异常: {str(e)}") def tcp_listener(self): """监听本地TCP连接""" while self.running: try: client_sock, client_addr = self.tcp_sock.accept() + client_sock.settimeout(10.0) # 设置超时 print(f"新的本地连接来自 {client_addr}") - # 为每个连接生成唯一ID + # 为连接生成唯一ID conn_id = str(uuid.uuid4()) - - # 存储连接 self.active_connections[conn_id] = client_sock # 请求服务提供者建立连接 @@ -147,86 +282,105 @@ class ServiceConnector: 'conn_id': conn_id }).encode(), self.provider_addr) - time.sleep(0.5) - - # 启动数据转发线程 + # 启动数据转发 threading.Thread( target=self.forward_data, args=(conn_id, client_sock), daemon=True ).start() - - except: + except socket.timeout: pass + except Exception as e: + print(f"TCP监听错误: {str(e)}") def forward_data(self, conn_id, client_sock): - """转发本地TCP数据到UDP隧道""" - CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小 + """转发本地TCP数据到UDP通道""" try: - while True: + while conn_id in self.active_connections: # 从本地客户端读取数据 - data = client_sock.recv(65535) + data = client_sock.recv(32768) # 32KB数据块 if not data: break - # 通过UDP发送给服务提供者 - print(f"发送数据到服务提供者: {self.provider_addr}") - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': data.hex() # 十六进制编码二进制数据 - }).encode(), self.provider_addr) - # 分片发送数据 - # for i in range(0, len(data), CHUNK_SIZE): - # chunk = data[i:i + CHUNK_SIZE] - # self.udp_sock.sendto(json.dumps({ - # 'action': 'data', - # 'conn_id': conn_id, - # 'data': chunk.hex() - # }).encode(), self.provider_addr) - except: + # 通过可靠通道发送 + if conn_id in self.reliable_channels: + channel = self.reliable_channels[conn_id] + channel.send(data) + except socket.timeout: pass + except Exception as e: + print(f"数据转发错误: {str(e)}") finally: - client_sock.close() - if conn_id in self.active_connections: - del self.active_connections[conn_id] + self.close_connection(conn_id) + + def monitor_channel(self, conn_id, channel): + """监控通道状态和重传""" + while conn_id in self.reliable_channels: + try: + channel.check_retransmit() + # 发送定期ACK + if time.time() - channel.last_ack_time > channel.ack_interval: + channel.send_ack() + channel.last_ack_time = time.time() + time.sleep(0.1) + except Exception as e: + print(f"通道监控错误: {str(e)}") + break + + def close_connection(self, conn_id): + """关闭指定连接""" + if conn_id in self.active_connections: + sock = self.active_connections.pop(conn_id) + sock.close() + if conn_id in self.reliable_channels: + channel = self.reliable_channels.pop(conn_id) + channel.close() + + # 通知服务提供端关闭连接 + if self.provider_addr: + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_conn', + 'conn_id': conn_id + }).encode(), self.provider_addr) + + print(f"连接 {conn_id} 已关闭") def run(self): """运行服务连接端""" - # 请求服务 if not self.request_service(): return - - # 执行打洞 if not self.punch_hole(): return - # 启动UDP监听线程 threading.Thread(target=self.udp_listener, daemon=True).start() - - # 启动TCP监听线程 threading.Thread(target=self.tcp_listener, daemon=True).start() - # 保持主线程运行 try: while self.running: time.sleep(1) except KeyboardInterrupt: - self.running = False - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_client', - 'client_id': self.client_id - }).encode(), self.provider_addr) - self.udp_sock.close() - self.tcp_sock.close() - print("服务连接端已停止") + self.shutdown() + + def shutdown(self): + """关闭服务连接端""" + self.running = False + # 通知服务提供端停止所有连接 + self.udp_sock.sendto(json.dumps({ + 'action': 'stop_client', + 'client_id': self.client_id + }).encode(), self.provider_addr) + # 关闭所有连接 + for conn_id in list(self.active_connections.keys()): + self.close_connection(conn_id) + self.udp_sock.close() + self.tcp_sock.close() + print("服务连接端已停止") if __name__ == '__main__': - # 配置信息 - COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP - SERVICE_NAME = "my_game_server" - LOCAL_PORT = 12345 # 本地映射端口 + COORDINATOR_ADDR = ('www.awin-x.top', 5000) + SERVICE_NAME = "my_service" + LOCAL_PORT = 12345 connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT) - connector.run() + connector.run() \ No newline at end of file diff --git a/coordinator.py b/coordinator.py index 566aa32..184b3f3 100644 --- a/coordinator.py +++ b/coordinator.py @@ -12,6 +12,7 @@ class CoordinatorServer: self.clients = defaultdict(dict) # 客户端ID -> 信息 self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock.bind((host, port)) + self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 增加接收缓冲区 self.running = True print(f"协调服务器运行在 {host}:{port}") @@ -59,8 +60,8 @@ class CoordinatorServer: target_id = [ client_id for client_id, info in self.clients.items() - if info['service_name'] == service_name - ][0] + if info.get('service_name') == service_name + ][0] # 简化处理,取第一个 print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}") @@ -106,7 +107,7 @@ class CoordinatorServer: print("协调服务器已启动,等待连接...") while self.running: try: - data, addr = self.udp_sock.recvfrom(4096) + data, addr = self.udp_sock.recvfrom(65535) # 支持更大的数据包 try: message = json.loads(data.decode()) action = message.get('action') @@ -137,4 +138,4 @@ class CoordinatorServer: if __name__ == '__main__': server = CoordinatorServer() - server.run() + server.run() \ No newline at end of file diff --git a/provider1.py b/provider1.py index 2517164..3f323e9 100644 --- a/provider1.py +++ b/provider1.py @@ -3,6 +3,133 @@ import json import threading import time import uuid +from collections import deque + + +class ReliableChannel: + """可靠传输通道类,实现序列号、ACK确认和重传""" + + def __init__(self, udp_sock, remote_addr): + self.udp_sock = udp_sock + self.remote_addr = remote_addr + self.send_buffer = deque() # 发送缓冲区 + self.recv_buffer = {} # 接收缓冲区 (seq -> data) + self.expected_seq = 0 # 期望的下一个序列号 + self.last_ack = -1 # 最后确认的序列号 + self.ack_interval = 0.2 # ACK发送间隔 (秒) + self.last_ack_time = 0 + self.window_size = 10 # 滑动窗口大小 + self.retransmit_timers = {} # 重传计时器 {seq: send_time} + self.retransmit_timeout = 0.5 # 重传超时时间 (秒) + self.running = True + + def send(self, data, is_control=False): + """发送数据并管理发送缓冲区""" + if not self.running: + return + + # 创建数据包 + seq = self.expected_seq + packet = { + 'seq': seq, + 'data': data.hex(), + 'is_control': is_control, + 'timestamp': time.time() + } + + # 添加到发送缓冲区 + self.send_buffer.append(packet) + self.retransmit_timers[seq] = time.time() + self.expected_seq += 1 + + # 如果窗口有空闲,立即发送 + self._send_from_buffer() + + def _send_from_buffer(self): + """从发送缓冲区发送数据包 (按照滑动窗口)""" + # 移除已确认的数据包 + while self.send_buffer and self.send_buffer[0]['seq'] <= self.last_ack: + self.send_buffer.popleft() + + # 发送窗口中的数据 + for packet in list(self.send_buffer)[:self.window_size]: + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'packet': packet + }).encode(), self.remote_addr) + + def process_ack(self, ack_seq): + """处理接收到的ACK""" + if ack_seq > self.last_ack: + self.last_ack = ack_seq + # 清除重传计时器 + for seq in list(self.retransmit_timers.keys()): + if seq <= ack_seq: + self.retransmit_timers.pop(seq, None) + # 尝试发送更多数据 + self._send_from_buffer() + + def process_packet(self, packet): + """处理接收到的数据包""" + seq = packet['seq'] + data = bytes.fromhex(packet['data']) + + # 发送ACK + if time.time() - self.last_ack_time > self.ack_interval: + self.send_ack() + self.last_ack_time = time.time() + + # 按序接收 + if seq == self.expected_seq: + self.expected_seq += 1 + # 处理之前缓存的数据 + self._process_buffered() + return data + # 乱序接收,缓存数据 + elif seq > self.expected_seq: + self.recv_buffer[seq] = data + return None + # 重复数据包 + else: + return None + + def _process_buffered(self): + """处理接收缓冲区中的连续数据""" + while self.expected_seq in self.recv_buffer: + data = self.recv_buffer.pop(self.expected_seq) + self.expected_seq += 1 + # TODO: 返回数据或交给上层处理 + + def send_ack(self): + """发送ACK确认""" + ack_packet = { + 'action': 'ack', + 'ack_seq': self.expected_seq - 1, # 确认到目前收到的最大连续序列号 + 'window': self.window_size + } + self.udp_sock.sendto(json.dumps(ack_packet).encode(), self.remote_addr) + + def check_retransmit(self): + """检查需要重传的数据包""" + now = time.time() + for seq, send_time in list(self.retransmit_timers.items()): + if now - send_time > self.retransmit_timeout: + # 重传数据包 + for packet in self.send_buffer: + if packet['seq'] == seq: + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'packet': packet + }).encode(), self.remote_addr) + self.retransmit_timers[seq] = now + break + + def close(self): + """关闭通道""" + self.running = False + self.send_buffer.clear() + self.recv_buffer.clear() + self.retransmit_timers.clear() class ServiceProvider: @@ -12,19 +139,16 @@ class ServiceProvider: self.internal_port = internal_port self.client_id = f"provider-{uuid.uuid4().hex[:8]}" - # 创建UDP套接字用于协调通信 + # 创建UDP套接字 self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) + self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB接收缓冲区 + self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.punch_port = self.udp_sock.getsockname()[1] - # # 创建TCP套接字用于服务监听 - # self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # self.tcp_sock.bind(('0.0.0.0', internal_port)) - # self.tcp_sock.listen(5) - print(f"服务端监听在端口 {internal_port}") - - # 存储活动连接 + # 存储连接和可靠通道 self.active_connections = {} + self.reliable_channels = {} # conn_id -> ReliableChannel self.running = True def register_service(self): @@ -51,170 +175,200 @@ class ServiceProvider: """监听UDP消息""" while self.running: try: - data, addr = self.udp_sock.recvfrom(4096) - print(f"收到来自 {addr} 的消息: {data.decode()}") - message = json.loads(data.decode()) + data, addr = self.udp_sock.recvfrom(65535) + try: + message = json.loads(data.decode()) + action = message.get('action') - if message.get('action') == 'punch' or message.get('action') == 'punch_check': - # 打洞请求 - 发送响应以确认连通性 - self.udp_sock.sendto(json.dumps( - {'action': 'punch_response', - 'client_id': message['client_id'] - }).encode(), addr) - print(f"收到来自 {addr} 的打洞请求,已响应") - elif message.get('action') == 'punch_response': - # 打洞响应 - 确认打洞成功 - print(f"收到来自 {addr} 的打洞响应") - elif message.get('action') == 'connect': - # 新的连接请求 - conn_id = message['conn_id'] - client_id = message['client_id'] - print(f"收到来自 {addr} 的连接请求") - threading.Thread( - target=self.handle_connection, - args=(conn_id, addr, client_id), - daemon=True - ).start() - elif message.get('action') == 'punch_request': - client_addr = message['client_addr'] - print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞") - self.punch_hole(message) - elif message.get('action') == 'data': - # 接收到来自客户端的数据 - conn_id = message['conn_id'] - data = bytes.fromhex(message['data']) - if conn_id in self.active_connections: - # 转发数据到本地服务 - print(f"收到来自 {addr} 的数据,转发到本地服务") - self.active_connections[conn_id]['local_sock'].sendall(data) - else: - print(f"收到来自 {addr} 的数据,但未找到对应的连接") - elif message.get('action') == 'stop_conn': - conn_id = message['conn_id'] - if conn_id in self.active_connections: - self.active_connections[conn_id]['local_sock'].close() - self.active_connections.pop(conn_id) - elif message.get('action') == 'stop_client': - client_id = message['client_id'] - for conn_id, conn_info in self.active_connections.items(): - if conn_info['client_id'] == client_id: - conn_info['local_sock'].close() - self.active_connections.pop(conn_id) - else: - print(f"收到未知消息: {message}") - except Exception as e: - print(e) + if action == 'punch' or action == 'punch_check': + # 打洞请求 - 发送响应 + self.udp_sock.sendto(json.dumps( + {'action': 'punch_response', 'client_id': self.client_id} + ).encode(), addr) + elif action == 'punch_response': + # 打洞响应 + pass + elif action == 'connect': + # 新的连接请求 + conn_id = message['conn_id'] + client_id = message['client_id'] + threading.Thread( + target=self.handle_connection, + args=(conn_id, addr, client_id), + daemon=True + ).start() + elif action == 'punch_request': + # 从协调服务器收到的打洞请求 + self.punch_hole(message) + elif action == 'data': + # 处理数据包 + conn_id = message.get('conn_id') + if conn_id and conn_id in self.reliable_channels: + channel = self.reliable_channels[conn_id] + packet = message['packet'] + + # 处理数据包 + data_chunk = channel.process_packet(packet) + if data_chunk and conn_id in self.active_connections: + # 转发数据到本地服务 + self.active_connections[conn_id]['local_sock'].sendall(data_chunk) + elif action == 'ack': + # 处理ACK确认 + conn_id = message.get('conn_id') + if conn_id and conn_id in self.reliable_channels: + channel = self.reliable_channels[conn_id] + ack_seq = message['ack_seq'] + channel.process_ack(ack_seq) + elif action == 'stop_conn': + conn_id = message['conn_id'] + self.close_connection(conn_id) + elif action == 'stop_client': + client_id = message['client_id'] + self.close_client_connections(client_id) + except json.JSONDecodeError: + # 非JSON数据可能是打洞包 + pass + except Exception as e: + print(f"UDP监听错误: {str(e)}") + except socket.timeout: pass + except Exception as e: + print(f"UDP监听异常: {str(e)}") def handle_connection(self, conn_id, client_addr, client_id): """处理来自客户端的连接""" try: - # 接受本地服务连接 + # 连接到本地服务 local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + local_sock.settimeout(5.0) local_sock.connect(('127.0.0.1', self.internal_port)) - if not local_sock: - print("无法连接到本地服务") - return - # 创建与客户端的UDP隧道 - print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}") + # 创建可靠通道 + channel = ReliableChannel(self.udp_sock, client_addr) + self.reliable_channels[conn_id] = channel # 存储连接 self.active_connections[conn_id] = { 'local_sock': local_sock, 'client_addr': client_addr, - 'client_id': client_id + 'client_id': client_id, + 'channel': channel } # 通知客户端连接就绪 self.udp_sock.sendto(json.dumps({ 'action': 'connected', - 'client_id': conn_id + 'conn_id': conn_id }).encode(), client_addr) - # 启动数据转发 - self.forward_data(conn_id, local_sock, client_addr) + # 启动数据转发和通道监控 + threading.Thread( + target=self.forward_data, + args=(conn_id, local_sock, channel), + daemon=True + ).start() + + threading.Thread( + target=self.monitor_channel, + args=(conn_id, channel), + daemon=True + ).start() + + print(f"建立连接 {conn_id}: {client_addr} -> 本地服务") except Exception as e: print(f"连接失败: {str(e)}") self.udp_sock.sendto(json.dumps({ 'action': 'connect_failed', - 'client_id': conn_id, + 'conn_id': conn_id, 'message': str(e) }).encode(), client_addr) - def forward_data(self, conn_id, local_sock, client_addr): - """转发TCP数据到UDP隧道""" - CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小 + def forward_data(self, conn_id, local_sock, channel): + """转发TCP数据到UDP通道""" try: - while True: + while conn_id in self.active_connections: # 从本地服务读取数据 - data = local_sock.recv(65535) + data = local_sock.recv(32768) # 32KB 数据块 if not data: break - # 通过UDP发送给客户端 - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': data.hex() # 十六进制编码二进制数据 - }).encode(), client_addr) - print(f"转发数据给客户端{client_addr}") - # for i in range(0, len(data), CHUNK_SIZE): - # chunk = data[i:i + CHUNK_SIZE] - # self.udp_sock.sendto(json.dumps({ - # 'action': 'data', - # 'conn_id': conn_id, - # 'data': chunk.hex() - # }).encode(), client_addr) + # 通过可靠通道发送 + channel.send(data) + except socket.timeout: + pass except Exception as e: - print(f"转发数据失败: {str(e)}") + print(f"数据转发错误: {str(e)}") finally: - local_sock.close() - if conn_id in self.active_connections: - del self.active_connections[conn_id] + self.close_connection(conn_id) + + def monitor_channel(self, conn_id, channel): + """监控通道状态和重传""" + while conn_id in self.active_connections: + try: + channel.check_retransmit() + # 发送定期ACK + if time.time() - channel.last_ack_time > channel.ack_interval: + channel.send_ack() + channel.last_ack_time = time.time() + time.sleep(0.1) + except Exception as e: + print(f"通道监控错误: {str(e)}") + + def close_connection(self, conn_id): + """关闭指定连接""" + if conn_id in self.active_connections: + conn = self.active_connections.pop(conn_id) + conn['local_sock'].close() + if conn_id in self.reliable_channels: + channel = self.reliable_channels.pop(conn_id) + channel.close() print(f"连接 {conn_id} 已关闭") + def close_client_connections(self, client_id): + """关闭指定客户端的连接""" + for conn_id, info in list(self.active_connections.items()): + if info['client_id'] == client_id: + self.close_connection(conn_id) + def run(self): """运行服务提供端""" - # 注册服务 self.register_service() - - # 启动UDP监听线程 threading.Thread(target=self.udp_listener, daemon=True).start() - # 保持主线程运行 try: while self.running: time.sleep(1) except KeyboardInterrupt: - self.running = False - self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr) - for conn_id, conn_info in self.active_connections.items(): - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': conn_id - }).encode(), conn_info['client_addr']) - self.udp_sock.close() - print("服务提供端已停止") + self.shutdown() def punch_hole(self, message): + """执行打洞操作""" for i in range(5): try: self.udp_sock.sendto(json.dumps({ 'action': 'punch', - 'client_id': message['client_id'] + 'client_id': self.client_id }).encode(), tuple(message['client_addr'])) time.sleep(0.2) except Exception as e: print(f"打洞失败: {str(e)}") - time.sleep(1) + + def shutdown(self): + """关闭服务提供端""" + self.running = False + # 通知协调服务器停止服务 + self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr) + # 关闭所有连接 + for conn_id in list(self.active_connections.keys()): + self.close_connection(conn_id) + self.udp_sock.close() + print("服务提供端已停止") if __name__ == '__main__': - # 配置信息 - COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP - SERVICE_NAME = "my_game_server" - INTERNAL_PORT = 5001 # 内网游戏服务器端口 + COORDINATOR_ADDR = ('www.awin-x.top', 5000) + SERVICE_NAME = "my_service" + INTERNAL_PORT = 5001 provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT) - provider.run() + provider.run() \ No newline at end of file