Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a2364352dd | |||
| 1147ef5a89 | |||
| 2aae52fb69 |
358
connector1.py
358
connector1.py
@ -1,33 +1,154 @@
|
||||
# connector.py
|
||||
import socket
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
import heapq
|
||||
from collections import deque
|
||||
|
||||
|
||||
class ReliableChannel:
|
||||
"""可靠传输通道类(与provider中的类似)"""
|
||||
|
||||
def __init__(self, udp_sock, remote_addr, conn_id):
|
||||
self.udp_sock = udp_sock
|
||||
self.remote_addr = remote_addr
|
||||
self.send_buffer = deque()
|
||||
self.recv_buffer = {}
|
||||
self.recv_queue = deque() # 接收队列
|
||||
self.expected_seq = 0
|
||||
self.last_ack = -1
|
||||
self.ack_interval = 0.2
|
||||
self.conn_id = conn_id
|
||||
self.last_ack_time = 0
|
||||
self.window_size = 10
|
||||
self.retransmit_timers = {}
|
||||
self.retransmit_timeout = 1
|
||||
self.running = True
|
||||
|
||||
def send(self, data, is_control=False):
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
seq = self.expected_seq
|
||||
packet = {
|
||||
'seq': seq,
|
||||
'data': data.hex(),
|
||||
'is_control': is_control,
|
||||
'timestamp': time.time()
|
||||
}
|
||||
|
||||
self.send_buffer.append(packet)
|
||||
self.retransmit_timers[seq] = time.time()
|
||||
self.expected_seq += 1
|
||||
self._send_from_buffer()
|
||||
|
||||
def _send_from_buffer(self):
|
||||
while self.send_buffer and self.send_buffer[0]['seq'] <= self.last_ack:
|
||||
self.send_buffer.popleft()
|
||||
|
||||
for packet in list(self.send_buffer)[:self.window_size]:
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'data',
|
||||
'packet': packet,
|
||||
'conn_id': self.conn_id
|
||||
}).encode(), self.remote_addr)
|
||||
|
||||
def process_ack(self, ack_seq):
|
||||
print(f"收到ack: {ack_seq}")
|
||||
if ack_seq > self.last_ack:
|
||||
print(f"更新last_ack: {ack_seq}")
|
||||
self.last_ack = ack_seq
|
||||
for seq in list(self.retransmit_timers.keys()):
|
||||
if seq <= ack_seq:
|
||||
self.retransmit_timers.pop(seq, None)
|
||||
self._send_from_buffer()
|
||||
|
||||
def process_packet(self, packet):
|
||||
seq = packet['seq']
|
||||
data = bytes.fromhex(packet['data'])
|
||||
|
||||
# if time.time() - self.last_ack_time > self.ack_interval:
|
||||
self.send_ack()
|
||||
self.last_ack_time = time.time()
|
||||
|
||||
if seq == self.expected_seq:
|
||||
self.expected_seq += 1
|
||||
self._process_buffered()
|
||||
return data
|
||||
elif seq > self.expected_seq:
|
||||
self.recv_buffer[seq] = data
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
def _process_buffered(self):
|
||||
while self.expected_seq in self.recv_buffer:
|
||||
data = self.recv_buffer.pop(self.expected_seq)
|
||||
# self.expected_seq += 1
|
||||
# self.recv_queue.append(data) # 添加到队列
|
||||
|
||||
def get_received_data(self):
|
||||
"""从接收队列中取出数据"""
|
||||
return self.recv_queue.popleft() if self.recv_queue else None
|
||||
|
||||
def send_ack(self):
|
||||
ack_packet = {
|
||||
'action': 'ack',
|
||||
'ack_seq': self.expected_seq - 1,
|
||||
'window': self.window_size,
|
||||
'conn_id': self.conn_id
|
||||
}
|
||||
self.udp_sock.sendto(json.dumps(ack_packet).encode(), self.remote_addr)
|
||||
|
||||
def check_retransmit(self):
|
||||
now = time.time()
|
||||
for seq, send_time in list(self.retransmit_timers.items()):
|
||||
if now - send_time > self.retransmit_timeout:
|
||||
print(f'有重传{seq}')
|
||||
for packet in self.send_buffer:
|
||||
if packet['seq'] == seq:
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'data',
|
||||
'packet': packet,
|
||||
'conn_id': self.conn_id
|
||||
}).encode(), self.remote_addr)
|
||||
self.retransmit_timers[seq] = now
|
||||
break
|
||||
|
||||
def close(self):
|
||||
self.running = False
|
||||
self.send_buffer.clear()
|
||||
self.recv_buffer.clear()
|
||||
self.retransmit_timers.clear()
|
||||
|
||||
|
||||
class ServiceConnector:
|
||||
def __init__(self, coordinator_addr, service_name, local_port):
|
||||
self.target_id = None
|
||||
self.coordinator_addr = coordinator_addr
|
||||
self.service_name = service_name
|
||||
self.local_port = local_port
|
||||
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# 创建UDP套接字用于协调通信
|
||||
# 创建UDP套接字
|
||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.udp_sock.bind(('0.0.0.0', 0))
|
||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB接收缓冲区
|
||||
self.udp_sock.settimeout(5)
|
||||
|
||||
# 创建TCP套接字用于本地监听
|
||||
# 创建TCP监听套接字
|
||||
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.tcp_sock.bind(('127.0.0.1', local_port))
|
||||
self.tcp_sock.listen(5)
|
||||
self.tcp_sock.listen(10) # 增加监听队列
|
||||
print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
|
||||
|
||||
# 存储活动连接
|
||||
# 连接状态
|
||||
self.active_connections = {}
|
||||
self.reliable_channels = {}
|
||||
self.provider_addr = None
|
||||
self.internal_port = None
|
||||
self.target_id = None
|
||||
self.running = True
|
||||
|
||||
def request_service(self):
|
||||
@ -54,9 +175,6 @@ class ServiceConnector:
|
||||
|
||||
def punch_hole(self):
|
||||
"""执行UDP打洞"""
|
||||
if not self.provider_addr:
|
||||
return False
|
||||
|
||||
# 请求打洞
|
||||
message = {
|
||||
'action': 'punch_request',
|
||||
@ -74,70 +192,107 @@ class ServiceConnector:
|
||||
|
||||
# 向服务提供者发送打洞包
|
||||
print(f"尝试打洞到 {self.provider_addr}...")
|
||||
for _ in range(5):
|
||||
self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr)
|
||||
time.sleep(0.5)
|
||||
for _ in range(10): # 增加打洞尝试次数
|
||||
self.udp_sock.sendto(b'PUNCH', self.provider_addr) # 发送原始UDP包
|
||||
time.sleep(0.2)
|
||||
|
||||
# 检查连通性
|
||||
self.udp_sock.settimeout(10.0)
|
||||
self.udp_sock.settimeout(5.0)
|
||||
try:
|
||||
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
|
||||
data, addr = self.udp_sock.recvfrom(1024)
|
||||
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
|
||||
print("打洞成功! 已建立UDP连接")
|
||||
data, addr = self.udp_sock.recvfrom(4096)
|
||||
if addr == self.provider_addr:
|
||||
print("打洞成功!")
|
||||
return True
|
||||
else:
|
||||
print(f"错误的打洞响应{data}")
|
||||
print(f"错误的响应来源: {addr}")
|
||||
return False
|
||||
except socket.timeout:
|
||||
print("打洞失败: 未收到响应")
|
||||
return False
|
||||
finally:
|
||||
self.udp_sock.settimeout(None)
|
||||
self.udp_sock.settimeout(0.1)
|
||||
|
||||
def udp_listener(self):
|
||||
"""监听UDP消息"""
|
||||
while self.running:
|
||||
try:
|
||||
data, addr = self.udp_sock.recvfrom(65535)
|
||||
message = json.loads(data.decode())
|
||||
print(f"收到UDP消息: {data.decode()}")
|
||||
try:
|
||||
message = json.loads(data.decode())
|
||||
action = message.get('action')
|
||||
|
||||
if message['action'] == 'punch_response':
|
||||
# 打洞响应 - 确认连通性
|
||||
print(f"收到来自 {addr} 的打洞响应")
|
||||
elif message['action'] == 'data':
|
||||
if message['conn_id'] in self.active_connections:
|
||||
# 转发数据到本地客户端
|
||||
print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}")
|
||||
self.active_connections[message['conn_id']].send(bytes.fromhex(message['data']))
|
||||
if action == 'punch_response':
|
||||
# 打洞响应
|
||||
print(f"收到打洞响应: {message}")
|
||||
pass
|
||||
elif action == 'connected':
|
||||
# 连接建立成功
|
||||
print(f"连接建立成功: {message}")
|
||||
conn_id = message['conn_id']
|
||||
if conn_id in self.active_connections:
|
||||
# 创建可靠通道
|
||||
channel = ReliableChannel(self.udp_sock, self.provider_addr, conn_id)
|
||||
self.reliable_channels[conn_id] = channel
|
||||
|
||||
# 启动通道监控
|
||||
threading.Thread(
|
||||
target=self.monitor_channel,
|
||||
args=(conn_id, channel),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
elif action == 'data':
|
||||
# 处理数据包
|
||||
conn_id = message.get('conn_id')
|
||||
if conn_id and conn_id in self.reliable_channels:
|
||||
channel = self.reliable_channels[conn_id]
|
||||
packet = message['packet']
|
||||
|
||||
# 处理数据包
|
||||
data_chunk = channel.process_packet(packet)
|
||||
if data_chunk and conn_id in self.active_connections:
|
||||
# 转发数据到本地客户端
|
||||
print(f"转发数据到本地客户端: {data_chunk}")
|
||||
self.active_connections[conn_id].send(data_chunk)
|
||||
elif action == 'ack':
|
||||
# 处理ACK确认
|
||||
conn_id = message.get('conn_id')
|
||||
if conn_id and conn_id in self.reliable_channels:
|
||||
channel = self.reliable_channels[conn_id]
|
||||
ack_seq = message['ack_seq']
|
||||
channel.process_ack(ack_seq)
|
||||
elif action == 'stop_conn':
|
||||
conn_id = message['conn_id']
|
||||
self.close_connection(conn_id)
|
||||
elif action == 'connect_failed':
|
||||
conn_id = message['conn_id']
|
||||
print(f"连接失败: {message['message']}")
|
||||
self.close_connection(conn_id)
|
||||
else:
|
||||
print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接")
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_conn',
|
||||
'conn_id': message['conn_id']
|
||||
}).encode(), addr)
|
||||
elif message['action'] == 'stop_conn':
|
||||
# 停止连接
|
||||
if message['conn_id'] in self.active_connections:
|
||||
self.active_connections[message['conn_id']].close()
|
||||
del self.active_connections[message['conn_id']]
|
||||
print(f"已关闭本地连接 {message['conn_id']}")
|
||||
else:
|
||||
print(f"收到未知消息: {message}")
|
||||
print(f"未知动作: {action}")
|
||||
except json.JSONDecodeError:
|
||||
print("JSON解析错误")
|
||||
# 原始UDP包可能是打洞确认
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"UDP监听错误: {str(e)}")
|
||||
except socket.timeout:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print(f"UDP监听异常: {str(e)}")
|
||||
|
||||
def tcp_listener(self):
|
||||
"""监听本地TCP连接"""
|
||||
while self.running:
|
||||
try:
|
||||
client_sock, client_addr = self.tcp_sock.accept()
|
||||
client_sock.settimeout(10.0) # 设置超时
|
||||
print(f"新的本地连接来自 {client_addr}")
|
||||
|
||||
# 为每个连接生成唯一ID
|
||||
# 为连接生成唯一ID
|
||||
conn_id = str(uuid.uuid4())
|
||||
|
||||
# 存储连接
|
||||
self.active_connections[conn_id] = client_sock
|
||||
|
||||
# 请求服务提供者建立连接
|
||||
@ -147,86 +302,109 @@ class ServiceConnector:
|
||||
'conn_id': conn_id
|
||||
}).encode(), self.provider_addr)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# 启动数据转发线程
|
||||
# 启动数据转发
|
||||
threading.Thread(
|
||||
target=self.forward_data,
|
||||
args=(conn_id, client_sock),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
except:
|
||||
except socket.timeout:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"TCP监听错误: {str(e)}")
|
||||
|
||||
def forward_data(self, conn_id, client_sock):
|
||||
"""转发本地TCP数据到UDP隧道"""
|
||||
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
|
||||
"""转发本地TCP数据到UDP通道"""
|
||||
try:
|
||||
while True:
|
||||
while conn_id not in self.reliable_channels:
|
||||
time.sleep(0.01)
|
||||
while conn_id in self.active_connections:
|
||||
# 从本地客户端读取数据
|
||||
data = client_sock.recv(65535)
|
||||
data = client_sock.recv(32768) # 32KB数据块
|
||||
if not data:
|
||||
break
|
||||
|
||||
# 通过UDP发送给服务提供者
|
||||
print(f"发送数据到服务提供者: {self.provider_addr}")
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'data',
|
||||
'conn_id': conn_id,
|
||||
'data': data.hex() # 十六进制编码二进制数据
|
||||
}).encode(), self.provider_addr)
|
||||
# 分片发送数据
|
||||
# for i in range(0, len(data), CHUNK_SIZE):
|
||||
# chunk = data[i:i + CHUNK_SIZE]
|
||||
# self.udp_sock.sendto(json.dumps({
|
||||
# 'action': 'data',
|
||||
# 'conn_id': conn_id,
|
||||
# 'data': chunk.hex()
|
||||
# }).encode(), self.provider_addr)
|
||||
except:
|
||||
# 通过可靠通道发送
|
||||
if conn_id in self.reliable_channels:
|
||||
channel = self.reliable_channels[conn_id]
|
||||
print(f"发送数据: {data}")
|
||||
channel.send(data)
|
||||
except socket.timeout:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"数据转发错误: {str(e)}")
|
||||
finally:
|
||||
client_sock.close()
|
||||
if conn_id in self.active_connections:
|
||||
del self.active_connections[conn_id]
|
||||
self.close_connection(conn_id)
|
||||
|
||||
def monitor_channel(self, conn_id, channel):
|
||||
"""监控通道状态和重传"""
|
||||
while conn_id in self.reliable_channels:
|
||||
try:
|
||||
channel.check_retransmit()
|
||||
# 发送定期ACK
|
||||
if time.time() - channel.last_ack_time > channel.ack_interval:
|
||||
channel.send_ack()
|
||||
channel.last_ack_time = time.time()
|
||||
time.sleep(0.1)
|
||||
except Exception as e:
|
||||
print(f"通道监控错误: {str(e)}")
|
||||
break
|
||||
|
||||
def close_connection(self, conn_id):
|
||||
"""关闭指定连接"""
|
||||
if conn_id in self.active_connections:
|
||||
sock = self.active_connections.pop(conn_id)
|
||||
sock.close()
|
||||
if conn_id in self.reliable_channels:
|
||||
channel = self.reliable_channels.pop(conn_id)
|
||||
channel.close()
|
||||
|
||||
# 通知服务提供端关闭连接
|
||||
if self.provider_addr:
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_conn',
|
||||
'conn_id': conn_id
|
||||
}).encode(), self.provider_addr)
|
||||
|
||||
print(f"连接 {conn_id} 已关闭")
|
||||
|
||||
def run(self):
|
||||
"""运行服务连接端"""
|
||||
# 请求服务
|
||||
if not self.request_service():
|
||||
return
|
||||
|
||||
# 执行打洞
|
||||
if not self.punch_hole():
|
||||
return
|
||||
|
||||
# 启动UDP监听线程
|
||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
||||
|
||||
# 启动TCP监听线程
|
||||
threading.Thread(target=self.tcp_listener, daemon=True).start()
|
||||
|
||||
# 保持主线程运行
|
||||
try:
|
||||
while self.running:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
self.running = False
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_client',
|
||||
'client_id': self.client_id
|
||||
}).encode(), self.provider_addr)
|
||||
self.udp_sock.close()
|
||||
self.tcp_sock.close()
|
||||
print("服务连接端已停止")
|
||||
self.shutdown()
|
||||
|
||||
def shutdown(self):
|
||||
"""关闭服务连接端"""
|
||||
self.running = False
|
||||
# 通知服务提供端停止所有连接
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_client',
|
||||
'client_id': self.client_id
|
||||
}).encode(), self.provider_addr)
|
||||
# 关闭所有连接
|
||||
for conn_id in list(self.active_connections.keys()):
|
||||
self.close_connection(conn_id)
|
||||
self.udp_sock.close()
|
||||
self.tcp_sock.close()
|
||||
print("服务连接端已停止")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 配置信息
|
||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
||||
SERVICE_NAME = "my_game_server"
|
||||
LOCAL_PORT = 12345 # 本地映射端口
|
||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000)
|
||||
SERVICE_NAME = "my_service"
|
||||
LOCAL_PORT = 12345
|
||||
|
||||
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
|
||||
connector.run()
|
||||
|
||||
@ -12,6 +12,7 @@ class CoordinatorServer:
|
||||
self.clients = defaultdict(dict) # 客户端ID -> 信息
|
||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.udp_sock.bind((host, port))
|
||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 增加接收缓冲区
|
||||
self.running = True
|
||||
print(f"协调服务器运行在 {host}:{port}")
|
||||
|
||||
@ -59,8 +60,8 @@ class CoordinatorServer:
|
||||
target_id = [
|
||||
client_id
|
||||
for client_id, info in self.clients.items()
|
||||
if info['service_name'] == service_name
|
||||
][0]
|
||||
if info.get('service_name') == service_name
|
||||
][0] # 简化处理,取第一个
|
||||
|
||||
print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
|
||||
|
||||
@ -106,7 +107,7 @@ class CoordinatorServer:
|
||||
print("协调服务器已启动,等待连接...")
|
||||
while self.running:
|
||||
try:
|
||||
data, addr = self.udp_sock.recvfrom(4096)
|
||||
data, addr = self.udp_sock.recvfrom(65535) # 支持更大的数据包
|
||||
try:
|
||||
message = json.loads(data.decode())
|
||||
action = message.get('action')
|
||||
@ -137,4 +138,4 @@ class CoordinatorServer:
|
||||
|
||||
if __name__ == '__main__':
|
||||
server = CoordinatorServer()
|
||||
server.run()
|
||||
server.run()
|
||||
397
provider1.py
397
provider1.py
@ -3,6 +3,144 @@ import json
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from collections import deque
|
||||
|
||||
|
||||
class ReliableChannel:
|
||||
"""可靠传输通道类,实现序列号、ACK确认和重传"""
|
||||
|
||||
def __init__(self, udp_sock, remote_addr, conn_id):
|
||||
self.udp_sock = udp_sock
|
||||
self.remote_addr = remote_addr
|
||||
self.send_buffer = deque() # 发送缓冲区
|
||||
self.recv_buffer = {} # 接收缓冲区 (seq -> data)
|
||||
self.recv_queue = deque() # 接收队列
|
||||
self.expected_seq = 0 # 期望的下一个序列号
|
||||
self.last_ack = -1 # 最后确认的序列号
|
||||
self.ack_interval = 0.2 # ACK发送间隔 (秒)
|
||||
self.last_ack_time = 0
|
||||
self.window_size = 10 # 滑动窗口大小
|
||||
self.retransmit_timers = {} # 重传计时器 {seq: send_time}
|
||||
self.retransmit_timeout = 0.5 # 重传超时时间 (秒)
|
||||
self.conn_id = conn_id
|
||||
self.running = True
|
||||
|
||||
def send(self, data, is_control=False):
|
||||
"""发送数据并管理发送缓冲区"""
|
||||
print(f"发送数据: {data}")
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# 创建数据包
|
||||
seq = self.expected_seq
|
||||
packet = {
|
||||
'seq': seq,
|
||||
'data': data.hex(),
|
||||
'is_control': is_control,
|
||||
'timestamp': time.time()
|
||||
}
|
||||
|
||||
# 添加到发送缓冲区
|
||||
self.send_buffer.append(packet)
|
||||
self.retransmit_timers[seq] = time.time()
|
||||
self.expected_seq += 1
|
||||
|
||||
# 如果窗口有空闲,立即发送
|
||||
self._send_from_buffer()
|
||||
|
||||
def _send_from_buffer(self):
|
||||
"""从发送缓冲区发送数据包 (按照滑动窗口)"""
|
||||
# 移除已确认的数据包
|
||||
while self.send_buffer and self.send_buffer[0]['seq'] <= self.last_ack:
|
||||
self.send_buffer.popleft()
|
||||
|
||||
# 发送窗口中的数据
|
||||
for packet in list(self.send_buffer)[:self.window_size]:
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'data',
|
||||
'packet': packet,
|
||||
'conn_id': self.conn_id
|
||||
}).encode(), self.remote_addr)
|
||||
|
||||
def process_ack(self, ack_seq):
|
||||
"""处理接收到的ACK"""
|
||||
print(f"收到ACK: {ack_seq}")
|
||||
if ack_seq > self.last_ack:
|
||||
self.last_ack = ack_seq
|
||||
# 清除重传计时器
|
||||
for seq in list(self.retransmit_timers.keys()):
|
||||
if seq <= ack_seq:
|
||||
self.retransmit_timers.pop(seq, None)
|
||||
# 尝试发送更多数据
|
||||
self._send_from_buffer()
|
||||
|
||||
def process_packet(self, packet):
|
||||
"""处理接收到的数据包"""
|
||||
seq = packet['seq']
|
||||
data = bytes.fromhex(packet['data'])
|
||||
|
||||
# 发送ACK
|
||||
if time.time() - self.last_ack_time > self.ack_interval:
|
||||
self.send_ack()
|
||||
self.last_ack_time = time.time()
|
||||
|
||||
# 按序接收
|
||||
if seq == self.expected_seq:
|
||||
self.expected_seq += 1
|
||||
# 处理之前缓存的数据
|
||||
self._process_buffered()
|
||||
return data
|
||||
# 乱序接收,缓存数据
|
||||
elif seq > self.expected_seq:
|
||||
self.recv_buffer[seq] = data
|
||||
return None
|
||||
# 重复数据包
|
||||
else:
|
||||
return None
|
||||
|
||||
def _process_buffered(self):
|
||||
"""处理接收缓冲区中的连续数据"""
|
||||
while self.expected_seq in self.recv_buffer:
|
||||
data = self.recv_buffer.pop(self.expected_seq)
|
||||
# self.expected_seq += 1
|
||||
# self.recv_queue.append(data) # 添加到队列
|
||||
|
||||
def get_received_data(self):
|
||||
"""从接收队列中取出数据"""
|
||||
return self.recv_queue.popleft() if self.recv_queue else None
|
||||
|
||||
def send_ack(self):
|
||||
"""发送ACK确认"""
|
||||
ack_packet = {
|
||||
'action': 'ack',
|
||||
'ack_seq': self.expected_seq - 1, # 确认到目前收到的最大连续序列号
|
||||
'window': self.window_size,
|
||||
'conn_id': self.conn_id
|
||||
}
|
||||
self.udp_sock.sendto(json.dumps(ack_packet).encode(), self.remote_addr)
|
||||
|
||||
def check_retransmit(self):
|
||||
"""检查需要重传的数据包"""
|
||||
now = time.time()
|
||||
for seq, send_time in list(self.retransmit_timers.items()):
|
||||
if now - send_time > self.retransmit_timeout:
|
||||
# 重传数据包
|
||||
for packet in self.send_buffer:
|
||||
if packet['seq'] == seq:
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'data',
|
||||
'packet': packet,
|
||||
'conn_id': self.conn_id
|
||||
}).encode(), self.remote_addr)
|
||||
self.retransmit_timers[seq] = now
|
||||
break
|
||||
|
||||
def close(self):
|
||||
"""关闭通道"""
|
||||
self.running = False
|
||||
self.send_buffer.clear()
|
||||
self.recv_buffer.clear()
|
||||
self.retransmit_timers.clear()
|
||||
|
||||
|
||||
class ServiceProvider:
|
||||
@ -12,19 +150,16 @@ class ServiceProvider:
|
||||
self.internal_port = internal_port
|
||||
self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# 创建UDP套接字用于协调通信
|
||||
# 创建UDP套接字
|
||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.udp_sock.bind(('0.0.0.0', 0))
|
||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB接收缓冲区
|
||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.punch_port = self.udp_sock.getsockname()[1]
|
||||
|
||||
# # 创建TCP套接字用于服务监听
|
||||
# self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# self.tcp_sock.bind(('0.0.0.0', internal_port))
|
||||
# self.tcp_sock.listen(5)
|
||||
print(f"服务端监听在端口 {internal_port}")
|
||||
|
||||
# 存储活动连接
|
||||
# 存储连接和可靠通道
|
||||
self.active_connections = {}
|
||||
self.reliable_channels = {} # conn_id -> ReliableChannel
|
||||
self.running = True
|
||||
|
||||
def register_service(self):
|
||||
@ -51,170 +186,202 @@ class ServiceProvider:
|
||||
"""监听UDP消息"""
|
||||
while self.running:
|
||||
try:
|
||||
data, addr = self.udp_sock.recvfrom(4096)
|
||||
print(f"收到来自 {addr} 的消息: {data.decode()}")
|
||||
message = json.loads(data.decode())
|
||||
data, addr = self.udp_sock.recvfrom(65535)
|
||||
print(f"收到UDP消息: {data.decode()}")
|
||||
try:
|
||||
message = json.loads(data.decode())
|
||||
action = message.get('action')
|
||||
|
||||
if message.get('action') == 'punch' or message.get('action') == 'punch_check':
|
||||
# 打洞请求 - 发送响应以确认连通性
|
||||
self.udp_sock.sendto(json.dumps(
|
||||
{'action': 'punch_response',
|
||||
'client_id': message['client_id']
|
||||
}).encode(), addr)
|
||||
print(f"收到来自 {addr} 的打洞请求,已响应")
|
||||
elif message.get('action') == 'punch_response':
|
||||
# 打洞响应 - 确认打洞成功
|
||||
print(f"收到来自 {addr} 的打洞响应")
|
||||
elif message.get('action') == 'connect':
|
||||
# 新的连接请求
|
||||
conn_id = message['conn_id']
|
||||
client_id = message['client_id']
|
||||
print(f"收到来自 {addr} 的连接请求")
|
||||
threading.Thread(
|
||||
target=self.handle_connection,
|
||||
args=(conn_id, addr, client_id),
|
||||
daemon=True
|
||||
).start()
|
||||
elif message.get('action') == 'punch_request':
|
||||
client_addr = message['client_addr']
|
||||
print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞")
|
||||
self.punch_hole(message)
|
||||
elif message.get('action') == 'data':
|
||||
# 接收到来自客户端的数据
|
||||
conn_id = message['conn_id']
|
||||
data = bytes.fromhex(message['data'])
|
||||
if conn_id in self.active_connections:
|
||||
# 转发数据到本地服务
|
||||
print(f"收到来自 {addr} 的数据,转发到本地服务")
|
||||
self.active_connections[conn_id]['local_sock'].sendall(data)
|
||||
else:
|
||||
print(f"收到来自 {addr} 的数据,但未找到对应的连接")
|
||||
elif message.get('action') == 'stop_conn':
|
||||
conn_id = message['conn_id']
|
||||
if conn_id in self.active_connections:
|
||||
self.active_connections[conn_id]['local_sock'].close()
|
||||
self.active_connections.pop(conn_id)
|
||||
elif message.get('action') == 'stop_client':
|
||||
client_id = message['client_id']
|
||||
for conn_id, conn_info in self.active_connections.items():
|
||||
if conn_info['client_id'] == client_id:
|
||||
conn_info['local_sock'].close()
|
||||
self.active_connections.pop(conn_id)
|
||||
else:
|
||||
print(f"收到未知消息: {message}")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
if action == 'punch' or action == 'punch_check':
|
||||
# 打洞请求 - 发送响应
|
||||
self.udp_sock.sendto(json.dumps(
|
||||
{'action': 'punch_response', 'client_id': self.client_id}
|
||||
).encode(), addr)
|
||||
elif action == 'punch_response':
|
||||
# 打洞响应
|
||||
pass
|
||||
elif action == 'connect':
|
||||
# 新的连接请求
|
||||
conn_id = message['conn_id']
|
||||
client_id = message['client_id']
|
||||
threading.Thread(
|
||||
target=self.handle_connection,
|
||||
args=(conn_id, addr, client_id),
|
||||
daemon=True
|
||||
).start()
|
||||
elif action == 'punch_request':
|
||||
# 从协调服务器收到的打洞请求
|
||||
self.punch_hole(message)
|
||||
elif action == 'data':
|
||||
# 处理数据包
|
||||
conn_id = message.get('conn_id')
|
||||
if conn_id and conn_id in self.reliable_channels:
|
||||
channel = self.reliable_channels[conn_id]
|
||||
packet = message['packet']
|
||||
|
||||
# 处理数据包
|
||||
data_chunk = channel.process_packet(packet)
|
||||
if data_chunk and conn_id in self.active_connections:
|
||||
# 转发数据到本地服务
|
||||
print(f"转发数据包到本地服务: {data_chunk[:10]}")
|
||||
self.active_connections[conn_id]['local_sock'].sendall(data_chunk)
|
||||
elif action == 'ack':
|
||||
# 处理ACK确认
|
||||
conn_id = message.get('conn_id')
|
||||
if conn_id and conn_id in self.reliable_channels:
|
||||
channel = self.reliable_channels[conn_id]
|
||||
ack_seq = message['ack_seq']
|
||||
channel.process_ack(ack_seq)
|
||||
elif action == 'stop_conn':
|
||||
conn_id = message['conn_id']
|
||||
self.close_connection(conn_id)
|
||||
elif action == 'stop_client':
|
||||
client_id = message['client_id']
|
||||
self.close_client_connections(client_id)
|
||||
except json.JSONDecodeError:
|
||||
# 非JSON数据可能是打洞包
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"UDP监听错误: {str(e)}")
|
||||
except socket.timeout:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"UDP监听异常: {str(e)}")
|
||||
|
||||
def handle_connection(self, conn_id, client_addr, client_id):
|
||||
"""处理来自客户端的连接"""
|
||||
try:
|
||||
# 接受本地服务连接
|
||||
# 连接到本地服务
|
||||
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
local_sock.settimeout(5.0)
|
||||
local_sock.connect(('127.0.0.1', self.internal_port))
|
||||
if not local_sock:
|
||||
print("无法连接到本地服务")
|
||||
return
|
||||
|
||||
# 创建与客户端的UDP隧道
|
||||
print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}")
|
||||
# 创建可靠通道
|
||||
channel = ReliableChannel(self.udp_sock, client_addr, conn_id)
|
||||
self.reliable_channels[conn_id] = channel
|
||||
|
||||
# 存储连接
|
||||
self.active_connections[conn_id] = {
|
||||
'local_sock': local_sock,
|
||||
'client_addr': client_addr,
|
||||
'client_id': client_id
|
||||
'client_id': client_id,
|
||||
'channel': channel
|
||||
}
|
||||
|
||||
# 通知客户端连接就绪
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'connected',
|
||||
'client_id': conn_id
|
||||
'conn_id': conn_id
|
||||
}).encode(), client_addr)
|
||||
|
||||
# 启动数据转发
|
||||
self.forward_data(conn_id, local_sock, client_addr)
|
||||
# 启动数据转发和通道监控
|
||||
threading.Thread(
|
||||
target=self.forward_data,
|
||||
args=(conn_id, local_sock, channel),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
threading.Thread(
|
||||
target=self.monitor_channel,
|
||||
args=(conn_id, channel),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
print(f"建立连接 {conn_id}: {client_addr} -> 本地服务")
|
||||
except Exception as e:
|
||||
print(f"连接失败: {str(e)}")
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'connect_failed',
|
||||
'client_id': conn_id,
|
||||
'conn_id': conn_id,
|
||||
'message': str(e)
|
||||
}).encode(), client_addr)
|
||||
|
||||
def forward_data(self, conn_id, local_sock, client_addr):
|
||||
"""转发TCP数据到UDP隧道"""
|
||||
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
|
||||
def forward_data(self, conn_id, local_sock, channel):
|
||||
"""转发TCP数据到UDP通道"""
|
||||
try:
|
||||
while True:
|
||||
while conn_id in self.active_connections:
|
||||
# 从本地服务读取数据
|
||||
data = local_sock.recv(65535)
|
||||
data = local_sock.recv(32768) # 32KB 数据块
|
||||
if not data:
|
||||
break
|
||||
|
||||
# 通过UDP发送给客户端
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'data',
|
||||
'conn_id': conn_id,
|
||||
'data': data.hex() # 十六进制编码二进制数据
|
||||
}).encode(), client_addr)
|
||||
print(f"转发数据给客户端{client_addr}")
|
||||
# for i in range(0, len(data), CHUNK_SIZE):
|
||||
# chunk = data[i:i + CHUNK_SIZE]
|
||||
# self.udp_sock.sendto(json.dumps({
|
||||
# 'action': 'data',
|
||||
# 'conn_id': conn_id,
|
||||
# 'data': chunk.hex()
|
||||
# }).encode(), client_addr)
|
||||
# 通过可靠通道发送
|
||||
channel.send(data)
|
||||
except socket.timeout:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"转发数据失败: {str(e)}")
|
||||
print(f"数据转发错误: {str(e)}")
|
||||
finally:
|
||||
local_sock.close()
|
||||
if conn_id in self.active_connections:
|
||||
del self.active_connections[conn_id]
|
||||
self.close_connection(conn_id)
|
||||
|
||||
def monitor_channel(self, conn_id, channel):
|
||||
"""监控通道状态和重传"""
|
||||
while conn_id in self.active_connections:
|
||||
try:
|
||||
channel.check_retransmit()
|
||||
# 发送定期ACK
|
||||
if time.time() - channel.last_ack_time > channel.ack_interval:
|
||||
channel.send_ack()
|
||||
channel.last_ack_time = time.time()
|
||||
time.sleep(0.1)
|
||||
except Exception as e:
|
||||
print(f"通道监控错误: {str(e)}")
|
||||
|
||||
def close_connection(self, conn_id):
|
||||
"""关闭指定连接"""
|
||||
if conn_id in self.active_connections:
|
||||
conn = self.active_connections.pop(conn_id)
|
||||
conn['local_sock'].close()
|
||||
if conn_id in self.reliable_channels:
|
||||
channel = self.reliable_channels.pop(conn_id)
|
||||
channel.close()
|
||||
print(f"连接 {conn_id} 已关闭")
|
||||
|
||||
def close_client_connections(self, client_id):
|
||||
"""关闭指定客户端的连接"""
|
||||
for conn_id, info in list(self.active_connections.items()):
|
||||
if info['client_id'] == client_id:
|
||||
self.close_connection(conn_id)
|
||||
|
||||
def run(self):
|
||||
"""运行服务提供端"""
|
||||
# 注册服务
|
||||
self.register_service()
|
||||
|
||||
# 启动UDP监听线程
|
||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
||||
|
||||
# 保持主线程运行
|
||||
try:
|
||||
while self.running:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
self.running = False
|
||||
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
|
||||
for conn_id, conn_info in self.active_connections.items():
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_conn',
|
||||
'conn_id': conn_id
|
||||
}).encode(), conn_info['client_addr'])
|
||||
self.udp_sock.close()
|
||||
print("服务提供端已停止")
|
||||
self.shutdown()
|
||||
|
||||
def punch_hole(self, message):
|
||||
"""执行打洞操作"""
|
||||
for i in range(5):
|
||||
try:
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'punch',
|
||||
'client_id': message['client_id']
|
||||
'client_id': self.client_id
|
||||
}).encode(), tuple(message['client_addr']))
|
||||
time.sleep(0.2)
|
||||
except Exception as e:
|
||||
print(f"打洞失败: {str(e)}")
|
||||
time.sleep(1)
|
||||
|
||||
def shutdown(self):
|
||||
"""关闭服务提供端"""
|
||||
self.running = False
|
||||
# 通知协调服务器停止服务
|
||||
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
|
||||
# 关闭所有连接
|
||||
for conn_id in list(self.active_connections.keys()):
|
||||
self.close_connection(conn_id)
|
||||
self.udp_sock.close()
|
||||
print("服务提供端已停止")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 配置信息
|
||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
||||
SERVICE_NAME = "my_game_server"
|
||||
INTERNAL_PORT = 5001 # 内网游戏服务器端口
|
||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000)
|
||||
SERVICE_NAME = "my_service"
|
||||
INTERNAL_PORT = 22
|
||||
|
||||
provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
|
||||
provider.run()
|
||||
provider.run()
|
||||
Loading…
Reference in New Issue
Block a user