提高稳定性

This commit is contained in:
awin-x 2025-05-31 15:45:31 +08:00
parent 0f8b92f7f8
commit 2aae52fb69
3 changed files with 520 additions and 211 deletions

View File

@ -1,33 +1,142 @@
# connector.py
import socket import socket
import json import json
import threading import threading
import time import time
import uuid import uuid
import heapq
from collections import deque
class ReliableChannel:
"""可靠传输通道类与provider中的类似"""
def __init__(self, udp_sock, remote_addr):
self.udp_sock = udp_sock
self.remote_addr = remote_addr
self.send_buffer = deque()
self.recv_buffer = {}
self.expected_seq = 0
self.last_ack = -1
self.ack_interval = 0.2
self.last_ack_time = 0
self.window_size = 10
self.retransmit_timers = {}
self.retransmit_timeout = 0.5
self.running = True
def send(self, data, is_control=False):
if not self.running:
return
seq = self.expected_seq
packet = {
'seq': seq,
'data': data.hex(),
'is_control': is_control,
'timestamp': time.time()
}
self.send_buffer.append(packet)
self.retransmit_timers[seq] = time.time()
self.expected_seq += 1
self._send_from_buffer()
def _send_from_buffer(self):
while self.send_buffer and self.send_buffer[0]['seq'] <= self.last_ack:
self.send_buffer.popleft()
for packet in list(self.send_buffer)[:self.window_size]:
self.udp_sock.sendto(json.dumps({
'action': 'data',
'packet': packet
}).encode(), self.remote_addr)
def process_ack(self, ack_seq):
if ack_seq > self.last_ack:
self.last_ack = ack_seq
for seq in list(self.retransmit_timers.keys()):
if seq <= ack_seq:
self.retransmit_timers.pop(seq, None)
self._send_from_buffer()
def process_packet(self, packet):
seq = packet['seq']
data = bytes.fromhex(packet['data'])
if time.time() - self.last_ack_time > self.ack_interval:
self.send_ack()
self.last_ack_time = time.time()
if seq == self.expected_seq:
self.expected_seq += 1
self._process_buffered()
return data
elif seq > self.expected_seq:
self.recv_buffer[seq] = data
return None
else:
return None
def _process_buffered(self):
while self.expected_seq in self.recv_buffer:
data = self.recv_buffer.pop(self.expected_seq)
self.expected_seq += 1
# TODO: 返回数据或交给上层处理
def send_ack(self):
ack_packet = {
'action': 'ack',
'ack_seq': self.expected_seq - 1,
'window': self.window_size
}
self.udp_sock.sendto(json.dumps(ack_packet).encode(), self.remote_addr)
def check_retransmit(self):
now = time.time()
for seq, send_time in list(self.retransmit_timers.items()):
if now - send_time > self.retransmit_timeout:
for packet in self.send_buffer:
if packet['seq'] == seq:
self.udp_sock.sendto(json.dumps({
'action': 'data',
'packet': packet
}).encode(), self.remote_addr)
self.retransmit_timers[seq] = now
break
def close(self):
self.running = False
self.send_buffer.clear()
self.recv_buffer.clear()
self.retransmit_timers.clear()
class ServiceConnector: class ServiceConnector:
def __init__(self, coordinator_addr, service_name, local_port): def __init__(self, coordinator_addr, service_name, local_port):
self.target_id = None
self.coordinator_addr = coordinator_addr self.coordinator_addr = coordinator_addr
self.service_name = service_name self.service_name = service_name
self.local_port = local_port self.local_port = local_port
self.client_id = f"connector-{uuid.uuid4().hex[:8]}" self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
# 创建UDP套接字用于协调通信 # 创建UDP套接字
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('0.0.0.0', 0)) self.udp_sock.bind(('0.0.0.0', 0))
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB接收缓冲区
self.udp_sock.settimeout(0.1)
# 创建TCP套接字用于本地监听 # 创建TCP监听套接字
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_sock.bind(('127.0.0.1', local_port)) self.tcp_sock.bind(('127.0.0.1', local_port))
self.tcp_sock.listen(5) self.tcp_sock.listen(10) # 增加监听队列
print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'") print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
# 存储活动连接 # 连接状态
self.active_connections = {} self.active_connections = {}
self.reliable_channels = {}
self.provider_addr = None self.provider_addr = None
self.internal_port = None self.internal_port = None
self.target_id = None
self.running = True self.running = True
def request_service(self): def request_service(self):
@ -54,9 +163,6 @@ class ServiceConnector:
def punch_hole(self): def punch_hole(self):
"""执行UDP打洞""" """执行UDP打洞"""
if not self.provider_addr:
return False
# 请求打洞 # 请求打洞
message = { message = {
'action': 'punch_request', 'action': 'punch_request',
@ -74,70 +180,99 @@ class ServiceConnector:
# 向服务提供者发送打洞包 # 向服务提供者发送打洞包
print(f"尝试打洞到 {self.provider_addr}...") print(f"尝试打洞到 {self.provider_addr}...")
for _ in range(5): for _ in range(10): # 增加打洞尝试次数
self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr) self.udp_sock.sendto(b'PUNCH', self.provider_addr) # 发送原始UDP包
time.sleep(0.5) time.sleep(0.2)
# 检查连通性 # 检查连通性
self.udp_sock.settimeout(10.0) self.udp_sock.settimeout(5.0)
try: try:
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr) self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
data, addr = self.udp_sock.recvfrom(1024) data, addr = self.udp_sock.recvfrom(4096)
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr: if addr == self.provider_addr:
print("打洞成功! 已建立UDP连接") print("打洞成功! 已建立UDP连接")
return True return True
else: else:
print(f"错误的打洞响应{data}") print(f"错误的响应来源: {addr}")
return False return False
except socket.timeout: except socket.timeout:
print("打洞失败: 未收到响应") print("打洞失败: 未收到响应")
return False return False
finally: finally:
self.udp_sock.settimeout(None) self.udp_sock.settimeout(0.1)
def udp_listener(self): def udp_listener(self):
"""监听UDP消息""" """监听UDP消息"""
while self.running: while self.running:
try: try:
data, addr = self.udp_sock.recvfrom(65535) data, addr = self.udp_sock.recvfrom(65535)
message = json.loads(data.decode()) try:
message = json.loads(data.decode())
action = message.get('action')
if message['action'] == 'punch_response': if action == 'punch_response':
# 打洞响应 - 确认连通性 # 打洞响应
print(f"收到来自 {addr} 的打洞响应") pass
elif message['action'] == 'data': elif action == 'connected':
if message['conn_id'] in self.active_connections: # 连接建立成功
# 转发数据到本地客户端 conn_id = message['conn_id']
print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}") if conn_id in self.active_connections:
self.active_connections[message['conn_id']].send(bytes.fromhex(message['data'])) # 创建可靠通道
else: channel = ReliableChannel(self.udp_sock, self.provider_addr)
print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接") self.reliable_channels[conn_id] = channel
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn', # 启动通道监控
'conn_id': message['conn_id'] threading.Thread(
}).encode(), addr) target=self.monitor_channel,
elif message['action'] == 'stop_conn': args=(conn_id, channel),
# 停止连接 daemon=True
if message['conn_id'] in self.active_connections: ).start()
self.active_connections[message['conn_id']].close() elif action == 'data':
del self.active_connections[message['conn_id']] # 处理数据包
print(f"已关闭本地连接 {message['conn_id']}") conn_id = message.get('conn_id')
else: if conn_id and conn_id in self.reliable_channels:
print(f"收到未知消息: {message}") channel = self.reliable_channels[conn_id]
packet = message['packet']
# 处理数据包
data_chunk = channel.process_packet(packet)
if data_chunk and conn_id in self.active_connections:
# 转发数据到本地客户端
self.active_connections[conn_id].send(data_chunk)
elif action == 'ack':
# 处理ACK确认
conn_id = message.get('conn_id')
if conn_id and conn_id in self.reliable_channels:
channel = self.reliable_channels[conn_id]
ack_seq = message['ack_seq']
channel.process_ack(ack_seq)
elif action == 'stop_conn':
conn_id = message['conn_id']
self.close_connection(conn_id)
elif action == 'connect_failed':
conn_id = message['conn_id']
print(f"连接失败: {message['message']}")
self.close_connection(conn_id)
except json.JSONDecodeError:
# 原始UDP包可能是打洞确认
pass
except Exception as e:
print(f"UDP监听错误: {str(e)}")
except socket.timeout:
pass
except Exception as e: except Exception as e:
print(e) print(f"UDP监听异常: {str(e)}")
def tcp_listener(self): def tcp_listener(self):
"""监听本地TCP连接""" """监听本地TCP连接"""
while self.running: while self.running:
try: try:
client_sock, client_addr = self.tcp_sock.accept() client_sock, client_addr = self.tcp_sock.accept()
client_sock.settimeout(10.0) # 设置超时
print(f"新的本地连接来自 {client_addr}") print(f"新的本地连接来自 {client_addr}")
# 为每个连接生成唯一ID # 为连接生成唯一ID
conn_id = str(uuid.uuid4()) conn_id = str(uuid.uuid4())
# 存储连接
self.active_connections[conn_id] = client_sock self.active_connections[conn_id] = client_sock
# 请求服务提供者建立连接 # 请求服务提供者建立连接
@ -147,86 +282,105 @@ class ServiceConnector:
'conn_id': conn_id 'conn_id': conn_id
}).encode(), self.provider_addr) }).encode(), self.provider_addr)
time.sleep(0.5) # 启动数据转发
# 启动数据转发线程
threading.Thread( threading.Thread(
target=self.forward_data, target=self.forward_data,
args=(conn_id, client_sock), args=(conn_id, client_sock),
daemon=True daemon=True
).start() ).start()
except socket.timeout:
except:
pass pass
except Exception as e:
print(f"TCP监听错误: {str(e)}")
def forward_data(self, conn_id, client_sock): def forward_data(self, conn_id, client_sock):
"""转发本地TCP数据到UDP隧道""" """转发本地TCP数据到UDP通道"""
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
try: try:
while True: while conn_id in self.active_connections:
# 从本地客户端读取数据 # 从本地客户端读取数据
data = client_sock.recv(65535) data = client_sock.recv(32768) # 32KB数据块
if not data: if not data:
break break
# 通过UDP发送给服务提供者 # 通过可靠通道发送
print(f"发送数据到服务提供者: {self.provider_addr}") if conn_id in self.reliable_channels:
self.udp_sock.sendto(json.dumps({ channel = self.reliable_channels[conn_id]
'action': 'data', channel.send(data)
'conn_id': conn_id, except socket.timeout:
'data': data.hex() # 十六进制编码二进制数据
}).encode(), self.provider_addr)
# 分片发送数据
# for i in range(0, len(data), CHUNK_SIZE):
# chunk = data[i:i + CHUNK_SIZE]
# self.udp_sock.sendto(json.dumps({
# 'action': 'data',
# 'conn_id': conn_id,
# 'data': chunk.hex()
# }).encode(), self.provider_addr)
except:
pass pass
except Exception as e:
print(f"数据转发错误: {str(e)}")
finally: finally:
client_sock.close() self.close_connection(conn_id)
if conn_id in self.active_connections:
del self.active_connections[conn_id] def monitor_channel(self, conn_id, channel):
"""监控通道状态和重传"""
while conn_id in self.reliable_channels:
try:
channel.check_retransmit()
# 发送定期ACK
if time.time() - channel.last_ack_time > channel.ack_interval:
channel.send_ack()
channel.last_ack_time = time.time()
time.sleep(0.1)
except Exception as e:
print(f"通道监控错误: {str(e)}")
break
def close_connection(self, conn_id):
"""关闭指定连接"""
if conn_id in self.active_connections:
sock = self.active_connections.pop(conn_id)
sock.close()
if conn_id in self.reliable_channels:
channel = self.reliable_channels.pop(conn_id)
channel.close()
# 通知服务提供端关闭连接
if self.provider_addr:
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': conn_id
}).encode(), self.provider_addr)
print(f"连接 {conn_id} 已关闭")
def run(self): def run(self):
"""运行服务连接端""" """运行服务连接端"""
# 请求服务
if not self.request_service(): if not self.request_service():
return return
# 执行打洞
if not self.punch_hole(): if not self.punch_hole():
return return
# 启动UDP监听线程
threading.Thread(target=self.udp_listener, daemon=True).start() threading.Thread(target=self.udp_listener, daemon=True).start()
# 启动TCP监听线程
threading.Thread(target=self.tcp_listener, daemon=True).start() threading.Thread(target=self.tcp_listener, daemon=True).start()
# 保持主线程运行
try: try:
while self.running: while self.running:
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
self.running = False self.shutdown()
self.udp_sock.sendto(json.dumps({
'action': 'stop_client', def shutdown(self):
'client_id': self.client_id """关闭服务连接端"""
}).encode(), self.provider_addr) self.running = False
self.udp_sock.close() # 通知服务提供端停止所有连接
self.tcp_sock.close() self.udp_sock.sendto(json.dumps({
print("服务连接端已停止") 'action': 'stop_client',
'client_id': self.client_id
}).encode(), self.provider_addr)
# 关闭所有连接
for conn_id in list(self.active_connections.keys()):
self.close_connection(conn_id)
self.udp_sock.close()
self.tcp_sock.close()
print("服务连接端已停止")
if __name__ == '__main__': if __name__ == '__main__':
# 配置信息 COORDINATOR_ADDR = ('www.awin-x.top', 5000)
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP SERVICE_NAME = "my_service"
SERVICE_NAME = "my_game_server" LOCAL_PORT = 12345
LOCAL_PORT = 12345 # 本地映射端口
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT) connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
connector.run() connector.run()

View File

@ -12,6 +12,7 @@ class CoordinatorServer:
self.clients = defaultdict(dict) # 客户端ID -> 信息 self.clients = defaultdict(dict) # 客户端ID -> 信息
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind((host, port)) self.udp_sock.bind((host, port))
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 增加接收缓冲区
self.running = True self.running = True
print(f"协调服务器运行在 {host}:{port}") print(f"协调服务器运行在 {host}:{port}")
@ -59,8 +60,8 @@ class CoordinatorServer:
target_id = [ target_id = [
client_id client_id
for client_id, info in self.clients.items() for client_id, info in self.clients.items()
if info['service_name'] == service_name if info.get('service_name') == service_name
][0] ][0] # 简化处理,取第一个
print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}") print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
@ -106,7 +107,7 @@ class CoordinatorServer:
print("协调服务器已启动,等待连接...") print("协调服务器已启动,等待连接...")
while self.running: while self.running:
try: try:
data, addr = self.udp_sock.recvfrom(4096) data, addr = self.udp_sock.recvfrom(65535) # 支持更大的数据包
try: try:
message = json.loads(data.decode()) message = json.loads(data.decode())
action = message.get('action') action = message.get('action')

View File

@ -3,6 +3,133 @@ import json
import threading import threading
import time import time
import uuid import uuid
from collections import deque
class ReliableChannel:
"""可靠传输通道类实现序列号、ACK确认和重传"""
def __init__(self, udp_sock, remote_addr):
self.udp_sock = udp_sock
self.remote_addr = remote_addr
self.send_buffer = deque() # 发送缓冲区
self.recv_buffer = {} # 接收缓冲区 (seq -> data)
self.expected_seq = 0 # 期望的下一个序列号
self.last_ack = -1 # 最后确认的序列号
self.ack_interval = 0.2 # ACK发送间隔 (秒)
self.last_ack_time = 0
self.window_size = 10 # 滑动窗口大小
self.retransmit_timers = {} # 重传计时器 {seq: send_time}
self.retransmit_timeout = 0.5 # 重传超时时间 (秒)
self.running = True
def send(self, data, is_control=False):
"""发送数据并管理发送缓冲区"""
if not self.running:
return
# 创建数据包
seq = self.expected_seq
packet = {
'seq': seq,
'data': data.hex(),
'is_control': is_control,
'timestamp': time.time()
}
# 添加到发送缓冲区
self.send_buffer.append(packet)
self.retransmit_timers[seq] = time.time()
self.expected_seq += 1
# 如果窗口有空闲,立即发送
self._send_from_buffer()
def _send_from_buffer(self):
"""从发送缓冲区发送数据包 (按照滑动窗口)"""
# 移除已确认的数据包
while self.send_buffer and self.send_buffer[0]['seq'] <= self.last_ack:
self.send_buffer.popleft()
# 发送窗口中的数据
for packet in list(self.send_buffer)[:self.window_size]:
self.udp_sock.sendto(json.dumps({
'action': 'data',
'packet': packet
}).encode(), self.remote_addr)
def process_ack(self, ack_seq):
"""处理接收到的ACK"""
if ack_seq > self.last_ack:
self.last_ack = ack_seq
# 清除重传计时器
for seq in list(self.retransmit_timers.keys()):
if seq <= ack_seq:
self.retransmit_timers.pop(seq, None)
# 尝试发送更多数据
self._send_from_buffer()
def process_packet(self, packet):
"""处理接收到的数据包"""
seq = packet['seq']
data = bytes.fromhex(packet['data'])
# 发送ACK
if time.time() - self.last_ack_time > self.ack_interval:
self.send_ack()
self.last_ack_time = time.time()
# 按序接收
if seq == self.expected_seq:
self.expected_seq += 1
# 处理之前缓存的数据
self._process_buffered()
return data
# 乱序接收,缓存数据
elif seq > self.expected_seq:
self.recv_buffer[seq] = data
return None
# 重复数据包
else:
return None
def _process_buffered(self):
"""处理接收缓冲区中的连续数据"""
while self.expected_seq in self.recv_buffer:
data = self.recv_buffer.pop(self.expected_seq)
self.expected_seq += 1
# TODO: 返回数据或交给上层处理
def send_ack(self):
"""发送ACK确认"""
ack_packet = {
'action': 'ack',
'ack_seq': self.expected_seq - 1, # 确认到目前收到的最大连续序列号
'window': self.window_size
}
self.udp_sock.sendto(json.dumps(ack_packet).encode(), self.remote_addr)
def check_retransmit(self):
"""检查需要重传的数据包"""
now = time.time()
for seq, send_time in list(self.retransmit_timers.items()):
if now - send_time > self.retransmit_timeout:
# 重传数据包
for packet in self.send_buffer:
if packet['seq'] == seq:
self.udp_sock.sendto(json.dumps({
'action': 'data',
'packet': packet
}).encode(), self.remote_addr)
self.retransmit_timers[seq] = now
break
def close(self):
"""关闭通道"""
self.running = False
self.send_buffer.clear()
self.recv_buffer.clear()
self.retransmit_timers.clear()
class ServiceProvider: class ServiceProvider:
@ -12,19 +139,16 @@ class ServiceProvider:
self.internal_port = internal_port self.internal_port = internal_port
self.client_id = f"provider-{uuid.uuid4().hex[:8]}" self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
# 创建UDP套接字用于协调通信 # 创建UDP套接字
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('0.0.0.0', 0)) self.udp_sock.bind(('0.0.0.0', 0))
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB接收缓冲区
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.punch_port = self.udp_sock.getsockname()[1]
# # 创建TCP套接字用于服务监听 # 存储连接和可靠通道
# self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.tcp_sock.bind(('0.0.0.0', internal_port))
# self.tcp_sock.listen(5)
print(f"服务端监听在端口 {internal_port}")
# 存储活动连接
self.active_connections = {} self.active_connections = {}
self.reliable_channels = {} # conn_id -> ReliableChannel
self.running = True self.running = True
def register_service(self): def register_service(self):
@ -51,170 +175,200 @@ class ServiceProvider:
"""监听UDP消息""" """监听UDP消息"""
while self.running: while self.running:
try: try:
data, addr = self.udp_sock.recvfrom(4096) data, addr = self.udp_sock.recvfrom(65535)
print(f"收到来自 {addr} 的消息: {data.decode()}") try:
message = json.loads(data.decode()) message = json.loads(data.decode())
action = message.get('action')
if message.get('action') == 'punch' or message.get('action') == 'punch_check': if action == 'punch' or action == 'punch_check':
# 打洞请求 - 发送响应以确认连通性 # 打洞请求 - 发送响应
self.udp_sock.sendto(json.dumps( self.udp_sock.sendto(json.dumps(
{'action': 'punch_response', {'action': 'punch_response', 'client_id': self.client_id}
'client_id': message['client_id'] ).encode(), addr)
}).encode(), addr) elif action == 'punch_response':
print(f"收到来自 {addr} 的打洞请求,已响应") # 打洞响应
elif message.get('action') == 'punch_response': pass
# 打洞响应 - 确认打洞成功 elif action == 'connect':
print(f"收到来自 {addr} 的打洞响应") # 新的连接请求
elif message.get('action') == 'connect': conn_id = message['conn_id']
# 新的连接请求 client_id = message['client_id']
conn_id = message['conn_id'] threading.Thread(
client_id = message['client_id'] target=self.handle_connection,
print(f"收到来自 {addr} 的连接请求") args=(conn_id, addr, client_id),
threading.Thread( daemon=True
target=self.handle_connection, ).start()
args=(conn_id, addr, client_id), elif action == 'punch_request':
daemon=True # 从协调服务器收到的打洞请求
).start() self.punch_hole(message)
elif message.get('action') == 'punch_request': elif action == 'data':
client_addr = message['client_addr'] # 处理数据包
print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞") conn_id = message.get('conn_id')
self.punch_hole(message) if conn_id and conn_id in self.reliable_channels:
elif message.get('action') == 'data': channel = self.reliable_channels[conn_id]
# 接收到来自客户端的数据 packet = message['packet']
conn_id = message['conn_id']
data = bytes.fromhex(message['data']) # 处理数据包
if conn_id in self.active_connections: data_chunk = channel.process_packet(packet)
# 转发数据到本地服务 if data_chunk and conn_id in self.active_connections:
print(f"收到来自 {addr} 的数据,转发到本地服务") # 转发数据到本地服务
self.active_connections[conn_id]['local_sock'].sendall(data) self.active_connections[conn_id]['local_sock'].sendall(data_chunk)
else: elif action == 'ack':
print(f"收到来自 {addr} 的数据,但未找到对应的连接") # 处理ACK确认
elif message.get('action') == 'stop_conn': conn_id = message.get('conn_id')
conn_id = message['conn_id'] if conn_id and conn_id in self.reliable_channels:
if conn_id in self.active_connections: channel = self.reliable_channels[conn_id]
self.active_connections[conn_id]['local_sock'].close() ack_seq = message['ack_seq']
self.active_connections.pop(conn_id) channel.process_ack(ack_seq)
elif message.get('action') == 'stop_client': elif action == 'stop_conn':
client_id = message['client_id'] conn_id = message['conn_id']
for conn_id, conn_info in self.active_connections.items(): self.close_connection(conn_id)
if conn_info['client_id'] == client_id: elif action == 'stop_client':
conn_info['local_sock'].close() client_id = message['client_id']
self.active_connections.pop(conn_id) self.close_client_connections(client_id)
else: except json.JSONDecodeError:
print(f"收到未知消息: {message}") # 非JSON数据可能是打洞包
except Exception as e: pass
print(e) except Exception as e:
print(f"UDP监听错误: {str(e)}")
except socket.timeout:
pass pass
except Exception as e:
print(f"UDP监听异常: {str(e)}")
def handle_connection(self, conn_id, client_addr, client_id): def handle_connection(self, conn_id, client_addr, client_id):
"""处理来自客户端的连接""" """处理来自客户端的连接"""
try: try:
# 接受本地服务连接 # 连接到本地服务
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
local_sock.settimeout(5.0)
local_sock.connect(('127.0.0.1', self.internal_port)) local_sock.connect(('127.0.0.1', self.internal_port))
if not local_sock:
print("无法连接到本地服务")
return
# 创建与客户端的UDP隧道 # 创建可靠通道
print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}") channel = ReliableChannel(self.udp_sock, client_addr)
self.reliable_channels[conn_id] = channel
# 存储连接 # 存储连接
self.active_connections[conn_id] = { self.active_connections[conn_id] = {
'local_sock': local_sock, 'local_sock': local_sock,
'client_addr': client_addr, 'client_addr': client_addr,
'client_id': client_id 'client_id': client_id,
'channel': channel
} }
# 通知客户端连接就绪 # 通知客户端连接就绪
self.udp_sock.sendto(json.dumps({ self.udp_sock.sendto(json.dumps({
'action': 'connected', 'action': 'connected',
'client_id': conn_id 'conn_id': conn_id
}).encode(), client_addr) }).encode(), client_addr)
# 启动数据转发 # 启动数据转发和通道监控
self.forward_data(conn_id, local_sock, client_addr) threading.Thread(
target=self.forward_data,
args=(conn_id, local_sock, channel),
daemon=True
).start()
threading.Thread(
target=self.monitor_channel,
args=(conn_id, channel),
daemon=True
).start()
print(f"建立连接 {conn_id}: {client_addr} -> 本地服务")
except Exception as e: except Exception as e:
print(f"连接失败: {str(e)}") print(f"连接失败: {str(e)}")
self.udp_sock.sendto(json.dumps({ self.udp_sock.sendto(json.dumps({
'action': 'connect_failed', 'action': 'connect_failed',
'client_id': conn_id, 'conn_id': conn_id,
'message': str(e) 'message': str(e)
}).encode(), client_addr) }).encode(), client_addr)
def forward_data(self, conn_id, local_sock, client_addr): def forward_data(self, conn_id, local_sock, channel):
"""转发TCP数据到UDP隧道""" """转发TCP数据到UDP通道"""
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
try: try:
while True: while conn_id in self.active_connections:
# 从本地服务读取数据 # 从本地服务读取数据
data = local_sock.recv(65535) data = local_sock.recv(32768) # 32KB 数据块
if not data: if not data:
break break
# 通过UDP发送给客户端 # 通过可靠通道发送
self.udp_sock.sendto(json.dumps({ channel.send(data)
'action': 'data', except socket.timeout:
'conn_id': conn_id, pass
'data': data.hex() # 十六进制编码二进制数据
}).encode(), client_addr)
print(f"转发数据给客户端{client_addr}")
# for i in range(0, len(data), CHUNK_SIZE):
# chunk = data[i:i + CHUNK_SIZE]
# self.udp_sock.sendto(json.dumps({
# 'action': 'data',
# 'conn_id': conn_id,
# 'data': chunk.hex()
# }).encode(), client_addr)
except Exception as e: except Exception as e:
print(f"转发数据失败: {str(e)}") print(f"数据转发错误: {str(e)}")
finally: finally:
local_sock.close() self.close_connection(conn_id)
if conn_id in self.active_connections:
del self.active_connections[conn_id] def monitor_channel(self, conn_id, channel):
"""监控通道状态和重传"""
while conn_id in self.active_connections:
try:
channel.check_retransmit()
# 发送定期ACK
if time.time() - channel.last_ack_time > channel.ack_interval:
channel.send_ack()
channel.last_ack_time = time.time()
time.sleep(0.1)
except Exception as e:
print(f"通道监控错误: {str(e)}")
def close_connection(self, conn_id):
"""关闭指定连接"""
if conn_id in self.active_connections:
conn = self.active_connections.pop(conn_id)
conn['local_sock'].close()
if conn_id in self.reliable_channels:
channel = self.reliable_channels.pop(conn_id)
channel.close()
print(f"连接 {conn_id} 已关闭") print(f"连接 {conn_id} 已关闭")
def close_client_connections(self, client_id):
"""关闭指定客户端的连接"""
for conn_id, info in list(self.active_connections.items()):
if info['client_id'] == client_id:
self.close_connection(conn_id)
def run(self): def run(self):
"""运行服务提供端""" """运行服务提供端"""
# 注册服务
self.register_service() self.register_service()
# 启动UDP监听线程
threading.Thread(target=self.udp_listener, daemon=True).start() threading.Thread(target=self.udp_listener, daemon=True).start()
# 保持主线程运行
try: try:
while self.running: while self.running:
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
self.running = False self.shutdown()
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
for conn_id, conn_info in self.active_connections.items():
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': conn_id
}).encode(), conn_info['client_addr'])
self.udp_sock.close()
print("服务提供端已停止")
def punch_hole(self, message): def punch_hole(self, message):
"""执行打洞操作"""
for i in range(5): for i in range(5):
try: try:
self.udp_sock.sendto(json.dumps({ self.udp_sock.sendto(json.dumps({
'action': 'punch', 'action': 'punch',
'client_id': message['client_id'] 'client_id': self.client_id
}).encode(), tuple(message['client_addr'])) }).encode(), tuple(message['client_addr']))
time.sleep(0.2) time.sleep(0.2)
except Exception as e: except Exception as e:
print(f"打洞失败: {str(e)}") print(f"打洞失败: {str(e)}")
time.sleep(1)
def shutdown(self):
"""关闭服务提供端"""
self.running = False
# 通知协调服务器停止服务
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
# 关闭所有连接
for conn_id in list(self.active_connections.keys()):
self.close_connection(conn_id)
self.udp_sock.close()
print("服务提供端已停止")
if __name__ == '__main__': if __name__ == '__main__':
# 配置信息 COORDINATOR_ADDR = ('www.awin-x.top', 5000)
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP SERVICE_NAME = "my_service"
SERVICE_NAME = "my_game_server" INTERNAL_PORT = 5001
INTERNAL_PORT = 5001 # 内网游戏服务器端口
provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT) provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
provider.run() provider.run()