diff --git a/connector1.py b/connector1.py index 769eb29..8152cc2 100644 --- a/connector1.py +++ b/connector1.py @@ -16,6 +16,7 @@ class ReliableChannel: self.remote_addr = remote_addr self.send_buffer = deque() self.recv_buffer = {} + self.recv_queue = deque() # 接收队列 self.expected_seq = 0 self.last_ack = -1 self.ack_interval = 0.2 @@ -82,7 +83,11 @@ class ReliableChannel: while self.expected_seq in self.recv_buffer: data = self.recv_buffer.pop(self.expected_seq) self.expected_seq += 1 - # TODO: 返回数据或交给上层处理 + self.recv_queue.append(data) # 添加到队列 + + def get_received_data(self): + """从接收队列中取出数据""" + return self.recv_queue.popleft() if self.recv_queue else None def send_ack(self): ack_packet = { @@ -227,6 +232,8 @@ class ServiceConnector: args=(conn_id, channel), daemon=True ).start() + + elif action == 'data': # 处理数据包 conn_id = message.get('conn_id') @@ -288,11 +295,28 @@ class ServiceConnector: args=(conn_id, client_sock), daemon=True ).start() + + threading.Thread( + target=self.forward_incoming_data, + args=(conn_id, client_sock), + daemon=True + ).start() + except socket.timeout: pass except Exception as e: print(f"TCP监听错误: {str(e)}") + def forward_incoming_data(self, conn_id, local_sock): + while conn_id in self.active_connections: + if conn_id in self.reliable_channels: + channel = self.reliable_channels[conn_id] + data = channel.get_received_data() + if data: + local_sock.sendall(data) + else: + time.sleep(0.1) + def forward_data(self, conn_id, client_sock): """转发本地TCP数据到UDP通道""" try: diff --git a/provider1.py b/provider1.py index 3f323e9..d248609 100644 --- a/provider1.py +++ b/provider1.py @@ -14,6 +14,7 @@ class ReliableChannel: self.remote_addr = remote_addr self.send_buffer = deque() # 发送缓冲区 self.recv_buffer = {} # 接收缓冲区 (seq -> data) + self.recv_queue = deque() # 接收队列 self.expected_seq = 0 # 期望的下一个序列号 self.last_ack = -1 # 最后确认的序列号 self.ack_interval = 0.2 # ACK发送间隔 (秒) @@ -98,7 +99,11 @@ class ReliableChannel: while self.expected_seq in self.recv_buffer: data = self.recv_buffer.pop(self.expected_seq) self.expected_seq += 1 - # TODO: 返回数据或交给上层处理 + self.recv_queue.append(data) # 添加到队列 + + def get_received_data(self): + """从接收队列中取出数据""" + return self.recv_queue.popleft() if self.recv_queue else None def send_ack(self): """发送ACK确认""" @@ -268,6 +273,12 @@ class ServiceProvider: daemon=True ).start() + threading.Thread( + target=self.forward_incoming_data, + args=(conn_id, channel, local_sock), + daemon=True + ).start() + threading.Thread( target=self.monitor_channel, args=(conn_id, channel), @@ -283,6 +294,14 @@ class ServiceProvider: 'message': str(e) }).encode(), client_addr) + def forward_incoming_data(self, conn_id, channel, local_sock): + while conn_id in self.active_connections: + data = channel.get_received_data() + if data: + local_sock.sendall(data) + else: + time.sleep(0.02) + def forward_data(self, conn_id, local_sock, channel): """转发TCP数据到UDP通道""" try: