Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a2364352dd | |||
| 1147ef5a89 | |||
| 2aae52fb69 |
358
connector1.py
358
connector1.py
@ -1,33 +1,154 @@
|
|||||||
|
# connector.py
|
||||||
import socket
|
import socket
|
||||||
import json
|
import json
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
import heapq
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
|
||||||
|
class ReliableChannel:
|
||||||
|
"""可靠传输通道类(与provider中的类似)"""
|
||||||
|
|
||||||
|
def __init__(self, udp_sock, remote_addr, conn_id):
|
||||||
|
self.udp_sock = udp_sock
|
||||||
|
self.remote_addr = remote_addr
|
||||||
|
self.send_buffer = deque()
|
||||||
|
self.recv_buffer = {}
|
||||||
|
self.recv_queue = deque() # 接收队列
|
||||||
|
self.expected_seq = 0
|
||||||
|
self.last_ack = -1
|
||||||
|
self.ack_interval = 0.2
|
||||||
|
self.conn_id = conn_id
|
||||||
|
self.last_ack_time = 0
|
||||||
|
self.window_size = 10
|
||||||
|
self.retransmit_timers = {}
|
||||||
|
self.retransmit_timeout = 1
|
||||||
|
self.running = True
|
||||||
|
|
||||||
|
def send(self, data, is_control=False):
|
||||||
|
if not self.running:
|
||||||
|
return
|
||||||
|
|
||||||
|
seq = self.expected_seq
|
||||||
|
packet = {
|
||||||
|
'seq': seq,
|
||||||
|
'data': data.hex(),
|
||||||
|
'is_control': is_control,
|
||||||
|
'timestamp': time.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
self.send_buffer.append(packet)
|
||||||
|
self.retransmit_timers[seq] = time.time()
|
||||||
|
self.expected_seq += 1
|
||||||
|
self._send_from_buffer()
|
||||||
|
|
||||||
|
def _send_from_buffer(self):
|
||||||
|
while self.send_buffer and self.send_buffer[0]['seq'] <= self.last_ack:
|
||||||
|
self.send_buffer.popleft()
|
||||||
|
|
||||||
|
for packet in list(self.send_buffer)[:self.window_size]:
|
||||||
|
self.udp_sock.sendto(json.dumps({
|
||||||
|
'action': 'data',
|
||||||
|
'packet': packet,
|
||||||
|
'conn_id': self.conn_id
|
||||||
|
}).encode(), self.remote_addr)
|
||||||
|
|
||||||
|
def process_ack(self, ack_seq):
|
||||||
|
print(f"收到ack: {ack_seq}")
|
||||||
|
if ack_seq > self.last_ack:
|
||||||
|
print(f"更新last_ack: {ack_seq}")
|
||||||
|
self.last_ack = ack_seq
|
||||||
|
for seq in list(self.retransmit_timers.keys()):
|
||||||
|
if seq <= ack_seq:
|
||||||
|
self.retransmit_timers.pop(seq, None)
|
||||||
|
self._send_from_buffer()
|
||||||
|
|
||||||
|
def process_packet(self, packet):
|
||||||
|
seq = packet['seq']
|
||||||
|
data = bytes.fromhex(packet['data'])
|
||||||
|
|
||||||
|
# if time.time() - self.last_ack_time > self.ack_interval:
|
||||||
|
self.send_ack()
|
||||||
|
self.last_ack_time = time.time()
|
||||||
|
|
||||||
|
if seq == self.expected_seq:
|
||||||
|
self.expected_seq += 1
|
||||||
|
self._process_buffered()
|
||||||
|
return data
|
||||||
|
elif seq > self.expected_seq:
|
||||||
|
self.recv_buffer[seq] = data
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _process_buffered(self):
|
||||||
|
while self.expected_seq in self.recv_buffer:
|
||||||
|
data = self.recv_buffer.pop(self.expected_seq)
|
||||||
|
# self.expected_seq += 1
|
||||||
|
# self.recv_queue.append(data) # 添加到队列
|
||||||
|
|
||||||
|
def get_received_data(self):
|
||||||
|
"""从接收队列中取出数据"""
|
||||||
|
return self.recv_queue.popleft() if self.recv_queue else None
|
||||||
|
|
||||||
|
def send_ack(self):
|
||||||
|
ack_packet = {
|
||||||
|
'action': 'ack',
|
||||||
|
'ack_seq': self.expected_seq - 1,
|
||||||
|
'window': self.window_size,
|
||||||
|
'conn_id': self.conn_id
|
||||||
|
}
|
||||||
|
self.udp_sock.sendto(json.dumps(ack_packet).encode(), self.remote_addr)
|
||||||
|
|
||||||
|
def check_retransmit(self):
|
||||||
|
now = time.time()
|
||||||
|
for seq, send_time in list(self.retransmit_timers.items()):
|
||||||
|
if now - send_time > self.retransmit_timeout:
|
||||||
|
print(f'有重传{seq}')
|
||||||
|
for packet in self.send_buffer:
|
||||||
|
if packet['seq'] == seq:
|
||||||
|
self.udp_sock.sendto(json.dumps({
|
||||||
|
'action': 'data',
|
||||||
|
'packet': packet,
|
||||||
|
'conn_id': self.conn_id
|
||||||
|
}).encode(), self.remote_addr)
|
||||||
|
self.retransmit_timers[seq] = now
|
||||||
|
break
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.running = False
|
||||||
|
self.send_buffer.clear()
|
||||||
|
self.recv_buffer.clear()
|
||||||
|
self.retransmit_timers.clear()
|
||||||
|
|
||||||
|
|
||||||
class ServiceConnector:
|
class ServiceConnector:
|
||||||
def __init__(self, coordinator_addr, service_name, local_port):
|
def __init__(self, coordinator_addr, service_name, local_port):
|
||||||
self.target_id = None
|
|
||||||
self.coordinator_addr = coordinator_addr
|
self.coordinator_addr = coordinator_addr
|
||||||
self.service_name = service_name
|
self.service_name = service_name
|
||||||
self.local_port = local_port
|
self.local_port = local_port
|
||||||
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
|
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
|
||||||
|
|
||||||
# 创建UDP套接字用于协调通信
|
# 创建UDP套接字
|
||||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
self.udp_sock.bind(('0.0.0.0', 0))
|
self.udp_sock.bind(('0.0.0.0', 0))
|
||||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB接收缓冲区
|
||||||
|
self.udp_sock.settimeout(5)
|
||||||
|
|
||||||
# 创建TCP套接字用于本地监听
|
# 创建TCP监听套接字
|
||||||
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
self.tcp_sock.bind(('127.0.0.1', local_port))
|
self.tcp_sock.bind(('127.0.0.1', local_port))
|
||||||
self.tcp_sock.listen(5)
|
self.tcp_sock.listen(10) # 增加监听队列
|
||||||
print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
|
print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
|
||||||
|
|
||||||
# 存储活动连接
|
# 连接状态
|
||||||
self.active_connections = {}
|
self.active_connections = {}
|
||||||
|
self.reliable_channels = {}
|
||||||
self.provider_addr = None
|
self.provider_addr = None
|
||||||
self.internal_port = None
|
self.internal_port = None
|
||||||
|
self.target_id = None
|
||||||
self.running = True
|
self.running = True
|
||||||
|
|
||||||
def request_service(self):
|
def request_service(self):
|
||||||
@ -54,9 +175,6 @@ class ServiceConnector:
|
|||||||
|
|
||||||
def punch_hole(self):
|
def punch_hole(self):
|
||||||
"""执行UDP打洞"""
|
"""执行UDP打洞"""
|
||||||
if not self.provider_addr:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# 请求打洞
|
# 请求打洞
|
||||||
message = {
|
message = {
|
||||||
'action': 'punch_request',
|
'action': 'punch_request',
|
||||||
@ -74,70 +192,107 @@ class ServiceConnector:
|
|||||||
|
|
||||||
# 向服务提供者发送打洞包
|
# 向服务提供者发送打洞包
|
||||||
print(f"尝试打洞到 {self.provider_addr}...")
|
print(f"尝试打洞到 {self.provider_addr}...")
|
||||||
for _ in range(5):
|
for _ in range(10): # 增加打洞尝试次数
|
||||||
self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr)
|
self.udp_sock.sendto(b'PUNCH', self.provider_addr) # 发送原始UDP包
|
||||||
time.sleep(0.5)
|
time.sleep(0.2)
|
||||||
|
|
||||||
# 检查连通性
|
# 检查连通性
|
||||||
self.udp_sock.settimeout(10.0)
|
self.udp_sock.settimeout(5.0)
|
||||||
try:
|
try:
|
||||||
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
|
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
|
||||||
data, addr = self.udp_sock.recvfrom(1024)
|
data, addr = self.udp_sock.recvfrom(4096)
|
||||||
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
|
if addr == self.provider_addr:
|
||||||
print("打洞成功! 已建立UDP连接")
|
print("打洞成功!")
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
print(f"错误的打洞响应{data}")
|
print(f"错误的响应来源: {addr}")
|
||||||
return False
|
return False
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
print("打洞失败: 未收到响应")
|
print("打洞失败: 未收到响应")
|
||||||
return False
|
return False
|
||||||
finally:
|
finally:
|
||||||
self.udp_sock.settimeout(None)
|
self.udp_sock.settimeout(0.1)
|
||||||
|
|
||||||
def udp_listener(self):
|
def udp_listener(self):
|
||||||
"""监听UDP消息"""
|
"""监听UDP消息"""
|
||||||
while self.running:
|
while self.running:
|
||||||
try:
|
try:
|
||||||
data, addr = self.udp_sock.recvfrom(65535)
|
data, addr = self.udp_sock.recvfrom(65535)
|
||||||
message = json.loads(data.decode())
|
print(f"收到UDP消息: {data.decode()}")
|
||||||
|
try:
|
||||||
|
message = json.loads(data.decode())
|
||||||
|
action = message.get('action')
|
||||||
|
|
||||||
if message['action'] == 'punch_response':
|
if action == 'punch_response':
|
||||||
# 打洞响应 - 确认连通性
|
# 打洞响应
|
||||||
print(f"收到来自 {addr} 的打洞响应")
|
print(f"收到打洞响应: {message}")
|
||||||
elif message['action'] == 'data':
|
pass
|
||||||
if message['conn_id'] in self.active_connections:
|
elif action == 'connected':
|
||||||
# 转发数据到本地客户端
|
# 连接建立成功
|
||||||
print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}")
|
print(f"连接建立成功: {message}")
|
||||||
self.active_connections[message['conn_id']].send(bytes.fromhex(message['data']))
|
conn_id = message['conn_id']
|
||||||
|
if conn_id in self.active_connections:
|
||||||
|
# 创建可靠通道
|
||||||
|
channel = ReliableChannel(self.udp_sock, self.provider_addr, conn_id)
|
||||||
|
self.reliable_channels[conn_id] = channel
|
||||||
|
|
||||||
|
# 启动通道监控
|
||||||
|
threading.Thread(
|
||||||
|
target=self.monitor_channel,
|
||||||
|
args=(conn_id, channel),
|
||||||
|
daemon=True
|
||||||
|
).start()
|
||||||
|
|
||||||
|
elif action == 'data':
|
||||||
|
# 处理数据包
|
||||||
|
conn_id = message.get('conn_id')
|
||||||
|
if conn_id and conn_id in self.reliable_channels:
|
||||||
|
channel = self.reliable_channels[conn_id]
|
||||||
|
packet = message['packet']
|
||||||
|
|
||||||
|
# 处理数据包
|
||||||
|
data_chunk = channel.process_packet(packet)
|
||||||
|
if data_chunk and conn_id in self.active_connections:
|
||||||
|
# 转发数据到本地客户端
|
||||||
|
print(f"转发数据到本地客户端: {data_chunk}")
|
||||||
|
self.active_connections[conn_id].send(data_chunk)
|
||||||
|
elif action == 'ack':
|
||||||
|
# 处理ACK确认
|
||||||
|
conn_id = message.get('conn_id')
|
||||||
|
if conn_id and conn_id in self.reliable_channels:
|
||||||
|
channel = self.reliable_channels[conn_id]
|
||||||
|
ack_seq = message['ack_seq']
|
||||||
|
channel.process_ack(ack_seq)
|
||||||
|
elif action == 'stop_conn':
|
||||||
|
conn_id = message['conn_id']
|
||||||
|
self.close_connection(conn_id)
|
||||||
|
elif action == 'connect_failed':
|
||||||
|
conn_id = message['conn_id']
|
||||||
|
print(f"连接失败: {message['message']}")
|
||||||
|
self.close_connection(conn_id)
|
||||||
else:
|
else:
|
||||||
print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接")
|
print(f"未知动作: {action}")
|
||||||
self.udp_sock.sendto(json.dumps({
|
except json.JSONDecodeError:
|
||||||
'action': 'stop_conn',
|
print("JSON解析错误")
|
||||||
'conn_id': message['conn_id']
|
# 原始UDP包可能是打洞确认
|
||||||
}).encode(), addr)
|
pass
|
||||||
elif message['action'] == 'stop_conn':
|
except Exception as e:
|
||||||
# 停止连接
|
print(f"UDP监听错误: {str(e)}")
|
||||||
if message['conn_id'] in self.active_connections:
|
except socket.timeout:
|
||||||
self.active_connections[message['conn_id']].close()
|
pass
|
||||||
del self.active_connections[message['conn_id']]
|
|
||||||
print(f"已关闭本地连接 {message['conn_id']}")
|
|
||||||
else:
|
|
||||||
print(f"收到未知消息: {message}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(f"UDP监听异常: {str(e)}")
|
||||||
|
|
||||||
def tcp_listener(self):
|
def tcp_listener(self):
|
||||||
"""监听本地TCP连接"""
|
"""监听本地TCP连接"""
|
||||||
while self.running:
|
while self.running:
|
||||||
try:
|
try:
|
||||||
client_sock, client_addr = self.tcp_sock.accept()
|
client_sock, client_addr = self.tcp_sock.accept()
|
||||||
|
client_sock.settimeout(10.0) # 设置超时
|
||||||
print(f"新的本地连接来自 {client_addr}")
|
print(f"新的本地连接来自 {client_addr}")
|
||||||
|
|
||||||
# 为每个连接生成唯一ID
|
# 为连接生成唯一ID
|
||||||
conn_id = str(uuid.uuid4())
|
conn_id = str(uuid.uuid4())
|
||||||
|
|
||||||
# 存储连接
|
|
||||||
self.active_connections[conn_id] = client_sock
|
self.active_connections[conn_id] = client_sock
|
||||||
|
|
||||||
# 请求服务提供者建立连接
|
# 请求服务提供者建立连接
|
||||||
@ -147,86 +302,109 @@ class ServiceConnector:
|
|||||||
'conn_id': conn_id
|
'conn_id': conn_id
|
||||||
}).encode(), self.provider_addr)
|
}).encode(), self.provider_addr)
|
||||||
|
|
||||||
time.sleep(0.5)
|
# 启动数据转发
|
||||||
|
|
||||||
# 启动数据转发线程
|
|
||||||
threading.Thread(
|
threading.Thread(
|
||||||
target=self.forward_data,
|
target=self.forward_data,
|
||||||
args=(conn_id, client_sock),
|
args=(conn_id, client_sock),
|
||||||
daemon=True
|
daemon=True
|
||||||
).start()
|
).start()
|
||||||
|
|
||||||
except:
|
except socket.timeout:
|
||||||
pass
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print(f"TCP监听错误: {str(e)}")
|
||||||
|
|
||||||
def forward_data(self, conn_id, client_sock):
|
def forward_data(self, conn_id, client_sock):
|
||||||
"""转发本地TCP数据到UDP隧道"""
|
"""转发本地TCP数据到UDP通道"""
|
||||||
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while conn_id not in self.reliable_channels:
|
||||||
|
time.sleep(0.01)
|
||||||
|
while conn_id in self.active_connections:
|
||||||
# 从本地客户端读取数据
|
# 从本地客户端读取数据
|
||||||
data = client_sock.recv(65535)
|
data = client_sock.recv(32768) # 32KB数据块
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
|
|
||||||
# 通过UDP发送给服务提供者
|
# 通过可靠通道发送
|
||||||
print(f"发送数据到服务提供者: {self.provider_addr}")
|
if conn_id in self.reliable_channels:
|
||||||
self.udp_sock.sendto(json.dumps({
|
channel = self.reliable_channels[conn_id]
|
||||||
'action': 'data',
|
print(f"发送数据: {data}")
|
||||||
'conn_id': conn_id,
|
channel.send(data)
|
||||||
'data': data.hex() # 十六进制编码二进制数据
|
except socket.timeout:
|
||||||
}).encode(), self.provider_addr)
|
|
||||||
# 分片发送数据
|
|
||||||
# for i in range(0, len(data), CHUNK_SIZE):
|
|
||||||
# chunk = data[i:i + CHUNK_SIZE]
|
|
||||||
# self.udp_sock.sendto(json.dumps({
|
|
||||||
# 'action': 'data',
|
|
||||||
# 'conn_id': conn_id,
|
|
||||||
# 'data': chunk.hex()
|
|
||||||
# }).encode(), self.provider_addr)
|
|
||||||
except:
|
|
||||||
pass
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print(f"数据转发错误: {str(e)}")
|
||||||
finally:
|
finally:
|
||||||
client_sock.close()
|
self.close_connection(conn_id)
|
||||||
if conn_id in self.active_connections:
|
|
||||||
del self.active_connections[conn_id]
|
def monitor_channel(self, conn_id, channel):
|
||||||
|
"""监控通道状态和重传"""
|
||||||
|
while conn_id in self.reliable_channels:
|
||||||
|
try:
|
||||||
|
channel.check_retransmit()
|
||||||
|
# 发送定期ACK
|
||||||
|
if time.time() - channel.last_ack_time > channel.ack_interval:
|
||||||
|
channel.send_ack()
|
||||||
|
channel.last_ack_time = time.time()
|
||||||
|
time.sleep(0.1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"通道监控错误: {str(e)}")
|
||||||
|
break
|
||||||
|
|
||||||
|
def close_connection(self, conn_id):
|
||||||
|
"""关闭指定连接"""
|
||||||
|
if conn_id in self.active_connections:
|
||||||
|
sock = self.active_connections.pop(conn_id)
|
||||||
|
sock.close()
|
||||||
|
if conn_id in self.reliable_channels:
|
||||||
|
channel = self.reliable_channels.pop(conn_id)
|
||||||
|
channel.close()
|
||||||
|
|
||||||
|
# 通知服务提供端关闭连接
|
||||||
|
if self.provider_addr:
|
||||||
|
self.udp_sock.sendto(json.dumps({
|
||||||
|
'action': 'stop_conn',
|
||||||
|
'conn_id': conn_id
|
||||||
|
}).encode(), self.provider_addr)
|
||||||
|
|
||||||
|
print(f"连接 {conn_id} 已关闭")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""运行服务连接端"""
|
"""运行服务连接端"""
|
||||||
# 请求服务
|
|
||||||
if not self.request_service():
|
if not self.request_service():
|
||||||
return
|
return
|
||||||
|
|
||||||
# 执行打洞
|
|
||||||
if not self.punch_hole():
|
if not self.punch_hole():
|
||||||
return
|
return
|
||||||
|
|
||||||
# 启动UDP监听线程
|
|
||||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
threading.Thread(target=self.udp_listener, daemon=True).start()
|
||||||
|
|
||||||
# 启动TCP监听线程
|
|
||||||
threading.Thread(target=self.tcp_listener, daemon=True).start()
|
threading.Thread(target=self.tcp_listener, daemon=True).start()
|
||||||
|
|
||||||
# 保持主线程运行
|
|
||||||
try:
|
try:
|
||||||
while self.running:
|
while self.running:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
self.running = False
|
self.shutdown()
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'stop_client',
|
def shutdown(self):
|
||||||
'client_id': self.client_id
|
"""关闭服务连接端"""
|
||||||
}).encode(), self.provider_addr)
|
self.running = False
|
||||||
self.udp_sock.close()
|
# 通知服务提供端停止所有连接
|
||||||
self.tcp_sock.close()
|
self.udp_sock.sendto(json.dumps({
|
||||||
print("服务连接端已停止")
|
'action': 'stop_client',
|
||||||
|
'client_id': self.client_id
|
||||||
|
}).encode(), self.provider_addr)
|
||||||
|
# 关闭所有连接
|
||||||
|
for conn_id in list(self.active_connections.keys()):
|
||||||
|
self.close_connection(conn_id)
|
||||||
|
self.udp_sock.close()
|
||||||
|
self.tcp_sock.close()
|
||||||
|
print("服务连接端已停止")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# 配置信息
|
COORDINATOR_ADDR = ('www.awin-x.top', 5000)
|
||||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
SERVICE_NAME = "my_service"
|
||||||
SERVICE_NAME = "my_game_server"
|
LOCAL_PORT = 12345
|
||||||
LOCAL_PORT = 12345 # 本地映射端口
|
|
||||||
|
|
||||||
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
|
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
|
||||||
connector.run()
|
connector.run()
|
||||||
|
|||||||
@ -12,6 +12,7 @@ class CoordinatorServer:
|
|||||||
self.clients = defaultdict(dict) # 客户端ID -> 信息
|
self.clients = defaultdict(dict) # 客户端ID -> 信息
|
||||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
self.udp_sock.bind((host, port))
|
self.udp_sock.bind((host, port))
|
||||||
|
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 增加接收缓冲区
|
||||||
self.running = True
|
self.running = True
|
||||||
print(f"协调服务器运行在 {host}:{port}")
|
print(f"协调服务器运行在 {host}:{port}")
|
||||||
|
|
||||||
@ -59,8 +60,8 @@ class CoordinatorServer:
|
|||||||
target_id = [
|
target_id = [
|
||||||
client_id
|
client_id
|
||||||
for client_id, info in self.clients.items()
|
for client_id, info in self.clients.items()
|
||||||
if info['service_name'] == service_name
|
if info.get('service_name') == service_name
|
||||||
][0]
|
][0] # 简化处理,取第一个
|
||||||
|
|
||||||
print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
|
print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
|
||||||
|
|
||||||
@ -106,7 +107,7 @@ class CoordinatorServer:
|
|||||||
print("协调服务器已启动,等待连接...")
|
print("协调服务器已启动,等待连接...")
|
||||||
while self.running:
|
while self.running:
|
||||||
try:
|
try:
|
||||||
data, addr = self.udp_sock.recvfrom(4096)
|
data, addr = self.udp_sock.recvfrom(65535) # 支持更大的数据包
|
||||||
try:
|
try:
|
||||||
message = json.loads(data.decode())
|
message = json.loads(data.decode())
|
||||||
action = message.get('action')
|
action = message.get('action')
|
||||||
@ -137,4 +138,4 @@ class CoordinatorServer:
|
|||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
server = CoordinatorServer()
|
server = CoordinatorServer()
|
||||||
server.run()
|
server.run()
|
||||||
397
provider1.py
397
provider1.py
@ -3,6 +3,144 @@ import json
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
|
||||||
|
class ReliableChannel:
|
||||||
|
"""可靠传输通道类,实现序列号、ACK确认和重传"""
|
||||||
|
|
||||||
|
def __init__(self, udp_sock, remote_addr, conn_id):
|
||||||
|
self.udp_sock = udp_sock
|
||||||
|
self.remote_addr = remote_addr
|
||||||
|
self.send_buffer = deque() # 发送缓冲区
|
||||||
|
self.recv_buffer = {} # 接收缓冲区 (seq -> data)
|
||||||
|
self.recv_queue = deque() # 接收队列
|
||||||
|
self.expected_seq = 0 # 期望的下一个序列号
|
||||||
|
self.last_ack = -1 # 最后确认的序列号
|
||||||
|
self.ack_interval = 0.2 # ACK发送间隔 (秒)
|
||||||
|
self.last_ack_time = 0
|
||||||
|
self.window_size = 10 # 滑动窗口大小
|
||||||
|
self.retransmit_timers = {} # 重传计时器 {seq: send_time}
|
||||||
|
self.retransmit_timeout = 0.5 # 重传超时时间 (秒)
|
||||||
|
self.conn_id = conn_id
|
||||||
|
self.running = True
|
||||||
|
|
||||||
|
def send(self, data, is_control=False):
|
||||||
|
"""发送数据并管理发送缓冲区"""
|
||||||
|
print(f"发送数据: {data}")
|
||||||
|
if not self.running:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 创建数据包
|
||||||
|
seq = self.expected_seq
|
||||||
|
packet = {
|
||||||
|
'seq': seq,
|
||||||
|
'data': data.hex(),
|
||||||
|
'is_control': is_control,
|
||||||
|
'timestamp': time.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
# 添加到发送缓冲区
|
||||||
|
self.send_buffer.append(packet)
|
||||||
|
self.retransmit_timers[seq] = time.time()
|
||||||
|
self.expected_seq += 1
|
||||||
|
|
||||||
|
# 如果窗口有空闲,立即发送
|
||||||
|
self._send_from_buffer()
|
||||||
|
|
||||||
|
def _send_from_buffer(self):
|
||||||
|
"""从发送缓冲区发送数据包 (按照滑动窗口)"""
|
||||||
|
# 移除已确认的数据包
|
||||||
|
while self.send_buffer and self.send_buffer[0]['seq'] <= self.last_ack:
|
||||||
|
self.send_buffer.popleft()
|
||||||
|
|
||||||
|
# 发送窗口中的数据
|
||||||
|
for packet in list(self.send_buffer)[:self.window_size]:
|
||||||
|
self.udp_sock.sendto(json.dumps({
|
||||||
|
'action': 'data',
|
||||||
|
'packet': packet,
|
||||||
|
'conn_id': self.conn_id
|
||||||
|
}).encode(), self.remote_addr)
|
||||||
|
|
||||||
|
def process_ack(self, ack_seq):
|
||||||
|
"""处理接收到的ACK"""
|
||||||
|
print(f"收到ACK: {ack_seq}")
|
||||||
|
if ack_seq > self.last_ack:
|
||||||
|
self.last_ack = ack_seq
|
||||||
|
# 清除重传计时器
|
||||||
|
for seq in list(self.retransmit_timers.keys()):
|
||||||
|
if seq <= ack_seq:
|
||||||
|
self.retransmit_timers.pop(seq, None)
|
||||||
|
# 尝试发送更多数据
|
||||||
|
self._send_from_buffer()
|
||||||
|
|
||||||
|
def process_packet(self, packet):
|
||||||
|
"""处理接收到的数据包"""
|
||||||
|
seq = packet['seq']
|
||||||
|
data = bytes.fromhex(packet['data'])
|
||||||
|
|
||||||
|
# 发送ACK
|
||||||
|
if time.time() - self.last_ack_time > self.ack_interval:
|
||||||
|
self.send_ack()
|
||||||
|
self.last_ack_time = time.time()
|
||||||
|
|
||||||
|
# 按序接收
|
||||||
|
if seq == self.expected_seq:
|
||||||
|
self.expected_seq += 1
|
||||||
|
# 处理之前缓存的数据
|
||||||
|
self._process_buffered()
|
||||||
|
return data
|
||||||
|
# 乱序接收,缓存数据
|
||||||
|
elif seq > self.expected_seq:
|
||||||
|
self.recv_buffer[seq] = data
|
||||||
|
return None
|
||||||
|
# 重复数据包
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _process_buffered(self):
|
||||||
|
"""处理接收缓冲区中的连续数据"""
|
||||||
|
while self.expected_seq in self.recv_buffer:
|
||||||
|
data = self.recv_buffer.pop(self.expected_seq)
|
||||||
|
# self.expected_seq += 1
|
||||||
|
# self.recv_queue.append(data) # 添加到队列
|
||||||
|
|
||||||
|
def get_received_data(self):
|
||||||
|
"""从接收队列中取出数据"""
|
||||||
|
return self.recv_queue.popleft() if self.recv_queue else None
|
||||||
|
|
||||||
|
def send_ack(self):
|
||||||
|
"""发送ACK确认"""
|
||||||
|
ack_packet = {
|
||||||
|
'action': 'ack',
|
||||||
|
'ack_seq': self.expected_seq - 1, # 确认到目前收到的最大连续序列号
|
||||||
|
'window': self.window_size,
|
||||||
|
'conn_id': self.conn_id
|
||||||
|
}
|
||||||
|
self.udp_sock.sendto(json.dumps(ack_packet).encode(), self.remote_addr)
|
||||||
|
|
||||||
|
def check_retransmit(self):
|
||||||
|
"""检查需要重传的数据包"""
|
||||||
|
now = time.time()
|
||||||
|
for seq, send_time in list(self.retransmit_timers.items()):
|
||||||
|
if now - send_time > self.retransmit_timeout:
|
||||||
|
# 重传数据包
|
||||||
|
for packet in self.send_buffer:
|
||||||
|
if packet['seq'] == seq:
|
||||||
|
self.udp_sock.sendto(json.dumps({
|
||||||
|
'action': 'data',
|
||||||
|
'packet': packet,
|
||||||
|
'conn_id': self.conn_id
|
||||||
|
}).encode(), self.remote_addr)
|
||||||
|
self.retransmit_timers[seq] = now
|
||||||
|
break
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""关闭通道"""
|
||||||
|
self.running = False
|
||||||
|
self.send_buffer.clear()
|
||||||
|
self.recv_buffer.clear()
|
||||||
|
self.retransmit_timers.clear()
|
||||||
|
|
||||||
|
|
||||||
class ServiceProvider:
|
class ServiceProvider:
|
||||||
@ -12,19 +150,16 @@ class ServiceProvider:
|
|||||||
self.internal_port = internal_port
|
self.internal_port = internal_port
|
||||||
self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
|
self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
|
||||||
|
|
||||||
# 创建UDP套接字用于协调通信
|
# 创建UDP套接字
|
||||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
self.udp_sock.bind(('0.0.0.0', 0))
|
self.udp_sock.bind(('0.0.0.0', 0))
|
||||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB接收缓冲区
|
||||||
|
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
self.punch_port = self.udp_sock.getsockname()[1]
|
||||||
|
|
||||||
# # 创建TCP套接字用于服务监听
|
# 存储连接和可靠通道
|
||||||
# self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
# self.tcp_sock.bind(('0.0.0.0', internal_port))
|
|
||||||
# self.tcp_sock.listen(5)
|
|
||||||
print(f"服务端监听在端口 {internal_port}")
|
|
||||||
|
|
||||||
# 存储活动连接
|
|
||||||
self.active_connections = {}
|
self.active_connections = {}
|
||||||
|
self.reliable_channels = {} # conn_id -> ReliableChannel
|
||||||
self.running = True
|
self.running = True
|
||||||
|
|
||||||
def register_service(self):
|
def register_service(self):
|
||||||
@ -51,170 +186,202 @@ class ServiceProvider:
|
|||||||
"""监听UDP消息"""
|
"""监听UDP消息"""
|
||||||
while self.running:
|
while self.running:
|
||||||
try:
|
try:
|
||||||
data, addr = self.udp_sock.recvfrom(4096)
|
data, addr = self.udp_sock.recvfrom(65535)
|
||||||
print(f"收到来自 {addr} 的消息: {data.decode()}")
|
print(f"收到UDP消息: {data.decode()}")
|
||||||
message = json.loads(data.decode())
|
try:
|
||||||
|
message = json.loads(data.decode())
|
||||||
|
action = message.get('action')
|
||||||
|
|
||||||
if message.get('action') == 'punch' or message.get('action') == 'punch_check':
|
if action == 'punch' or action == 'punch_check':
|
||||||
# 打洞请求 - 发送响应以确认连通性
|
# 打洞请求 - 发送响应
|
||||||
self.udp_sock.sendto(json.dumps(
|
self.udp_sock.sendto(json.dumps(
|
||||||
{'action': 'punch_response',
|
{'action': 'punch_response', 'client_id': self.client_id}
|
||||||
'client_id': message['client_id']
|
).encode(), addr)
|
||||||
}).encode(), addr)
|
elif action == 'punch_response':
|
||||||
print(f"收到来自 {addr} 的打洞请求,已响应")
|
# 打洞响应
|
||||||
elif message.get('action') == 'punch_response':
|
pass
|
||||||
# 打洞响应 - 确认打洞成功
|
elif action == 'connect':
|
||||||
print(f"收到来自 {addr} 的打洞响应")
|
# 新的连接请求
|
||||||
elif message.get('action') == 'connect':
|
conn_id = message['conn_id']
|
||||||
# 新的连接请求
|
client_id = message['client_id']
|
||||||
conn_id = message['conn_id']
|
threading.Thread(
|
||||||
client_id = message['client_id']
|
target=self.handle_connection,
|
||||||
print(f"收到来自 {addr} 的连接请求")
|
args=(conn_id, addr, client_id),
|
||||||
threading.Thread(
|
daemon=True
|
||||||
target=self.handle_connection,
|
).start()
|
||||||
args=(conn_id, addr, client_id),
|
elif action == 'punch_request':
|
||||||
daemon=True
|
# 从协调服务器收到的打洞请求
|
||||||
).start()
|
self.punch_hole(message)
|
||||||
elif message.get('action') == 'punch_request':
|
elif action == 'data':
|
||||||
client_addr = message['client_addr']
|
# 处理数据包
|
||||||
print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞")
|
conn_id = message.get('conn_id')
|
||||||
self.punch_hole(message)
|
if conn_id and conn_id in self.reliable_channels:
|
||||||
elif message.get('action') == 'data':
|
channel = self.reliable_channels[conn_id]
|
||||||
# 接收到来自客户端的数据
|
packet = message['packet']
|
||||||
conn_id = message['conn_id']
|
|
||||||
data = bytes.fromhex(message['data'])
|
# 处理数据包
|
||||||
if conn_id in self.active_connections:
|
data_chunk = channel.process_packet(packet)
|
||||||
# 转发数据到本地服务
|
if data_chunk and conn_id in self.active_connections:
|
||||||
print(f"收到来自 {addr} 的数据,转发到本地服务")
|
# 转发数据到本地服务
|
||||||
self.active_connections[conn_id]['local_sock'].sendall(data)
|
print(f"转发数据包到本地服务: {data_chunk[:10]}")
|
||||||
else:
|
self.active_connections[conn_id]['local_sock'].sendall(data_chunk)
|
||||||
print(f"收到来自 {addr} 的数据,但未找到对应的连接")
|
elif action == 'ack':
|
||||||
elif message.get('action') == 'stop_conn':
|
# 处理ACK确认
|
||||||
conn_id = message['conn_id']
|
conn_id = message.get('conn_id')
|
||||||
if conn_id in self.active_connections:
|
if conn_id and conn_id in self.reliable_channels:
|
||||||
self.active_connections[conn_id]['local_sock'].close()
|
channel = self.reliable_channels[conn_id]
|
||||||
self.active_connections.pop(conn_id)
|
ack_seq = message['ack_seq']
|
||||||
elif message.get('action') == 'stop_client':
|
channel.process_ack(ack_seq)
|
||||||
client_id = message['client_id']
|
elif action == 'stop_conn':
|
||||||
for conn_id, conn_info in self.active_connections.items():
|
conn_id = message['conn_id']
|
||||||
if conn_info['client_id'] == client_id:
|
self.close_connection(conn_id)
|
||||||
conn_info['local_sock'].close()
|
elif action == 'stop_client':
|
||||||
self.active_connections.pop(conn_id)
|
client_id = message['client_id']
|
||||||
else:
|
self.close_client_connections(client_id)
|
||||||
print(f"收到未知消息: {message}")
|
except json.JSONDecodeError:
|
||||||
except Exception as e:
|
# 非JSON数据可能是打洞包
|
||||||
print(e)
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print(f"UDP监听错误: {str(e)}")
|
||||||
|
except socket.timeout:
|
||||||
pass
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print(f"UDP监听异常: {str(e)}")
|
||||||
|
|
||||||
def handle_connection(self, conn_id, client_addr, client_id):
|
def handle_connection(self, conn_id, client_addr, client_id):
|
||||||
"""处理来自客户端的连接"""
|
"""处理来自客户端的连接"""
|
||||||
try:
|
try:
|
||||||
# 接受本地服务连接
|
# 连接到本地服务
|
||||||
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
local_sock.settimeout(5.0)
|
||||||
local_sock.connect(('127.0.0.1', self.internal_port))
|
local_sock.connect(('127.0.0.1', self.internal_port))
|
||||||
if not local_sock:
|
|
||||||
print("无法连接到本地服务")
|
|
||||||
return
|
|
||||||
|
|
||||||
# 创建与客户端的UDP隧道
|
# 创建可靠通道
|
||||||
print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}")
|
channel = ReliableChannel(self.udp_sock, client_addr, conn_id)
|
||||||
|
self.reliable_channels[conn_id] = channel
|
||||||
|
|
||||||
# 存储连接
|
# 存储连接
|
||||||
self.active_connections[conn_id] = {
|
self.active_connections[conn_id] = {
|
||||||
'local_sock': local_sock,
|
'local_sock': local_sock,
|
||||||
'client_addr': client_addr,
|
'client_addr': client_addr,
|
||||||
'client_id': client_id
|
'client_id': client_id,
|
||||||
|
'channel': channel
|
||||||
}
|
}
|
||||||
|
|
||||||
# 通知客户端连接就绪
|
# 通知客户端连接就绪
|
||||||
self.udp_sock.sendto(json.dumps({
|
self.udp_sock.sendto(json.dumps({
|
||||||
'action': 'connected',
|
'action': 'connected',
|
||||||
'client_id': conn_id
|
'conn_id': conn_id
|
||||||
}).encode(), client_addr)
|
}).encode(), client_addr)
|
||||||
|
|
||||||
# 启动数据转发
|
# 启动数据转发和通道监控
|
||||||
self.forward_data(conn_id, local_sock, client_addr)
|
threading.Thread(
|
||||||
|
target=self.forward_data,
|
||||||
|
args=(conn_id, local_sock, channel),
|
||||||
|
daemon=True
|
||||||
|
).start()
|
||||||
|
|
||||||
|
threading.Thread(
|
||||||
|
target=self.monitor_channel,
|
||||||
|
args=(conn_id, channel),
|
||||||
|
daemon=True
|
||||||
|
).start()
|
||||||
|
|
||||||
|
print(f"建立连接 {conn_id}: {client_addr} -> 本地服务")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"连接失败: {str(e)}")
|
print(f"连接失败: {str(e)}")
|
||||||
self.udp_sock.sendto(json.dumps({
|
self.udp_sock.sendto(json.dumps({
|
||||||
'action': 'connect_failed',
|
'action': 'connect_failed',
|
||||||
'client_id': conn_id,
|
'conn_id': conn_id,
|
||||||
'message': str(e)
|
'message': str(e)
|
||||||
}).encode(), client_addr)
|
}).encode(), client_addr)
|
||||||
|
|
||||||
def forward_data(self, conn_id, local_sock, client_addr):
|
def forward_data(self, conn_id, local_sock, channel):
|
||||||
"""转发TCP数据到UDP隧道"""
|
"""转发TCP数据到UDP通道"""
|
||||||
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while conn_id in self.active_connections:
|
||||||
# 从本地服务读取数据
|
# 从本地服务读取数据
|
||||||
data = local_sock.recv(65535)
|
data = local_sock.recv(32768) # 32KB 数据块
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
|
|
||||||
# 通过UDP发送给客户端
|
# 通过可靠通道发送
|
||||||
self.udp_sock.sendto(json.dumps({
|
channel.send(data)
|
||||||
'action': 'data',
|
except socket.timeout:
|
||||||
'conn_id': conn_id,
|
pass
|
||||||
'data': data.hex() # 十六进制编码二进制数据
|
|
||||||
}).encode(), client_addr)
|
|
||||||
print(f"转发数据给客户端{client_addr}")
|
|
||||||
# for i in range(0, len(data), CHUNK_SIZE):
|
|
||||||
# chunk = data[i:i + CHUNK_SIZE]
|
|
||||||
# self.udp_sock.sendto(json.dumps({
|
|
||||||
# 'action': 'data',
|
|
||||||
# 'conn_id': conn_id,
|
|
||||||
# 'data': chunk.hex()
|
|
||||||
# }).encode(), client_addr)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"转发数据失败: {str(e)}")
|
print(f"数据转发错误: {str(e)}")
|
||||||
finally:
|
finally:
|
||||||
local_sock.close()
|
self.close_connection(conn_id)
|
||||||
if conn_id in self.active_connections:
|
|
||||||
del self.active_connections[conn_id]
|
def monitor_channel(self, conn_id, channel):
|
||||||
|
"""监控通道状态和重传"""
|
||||||
|
while conn_id in self.active_connections:
|
||||||
|
try:
|
||||||
|
channel.check_retransmit()
|
||||||
|
# 发送定期ACK
|
||||||
|
if time.time() - channel.last_ack_time > channel.ack_interval:
|
||||||
|
channel.send_ack()
|
||||||
|
channel.last_ack_time = time.time()
|
||||||
|
time.sleep(0.1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"通道监控错误: {str(e)}")
|
||||||
|
|
||||||
|
def close_connection(self, conn_id):
|
||||||
|
"""关闭指定连接"""
|
||||||
|
if conn_id in self.active_connections:
|
||||||
|
conn = self.active_connections.pop(conn_id)
|
||||||
|
conn['local_sock'].close()
|
||||||
|
if conn_id in self.reliable_channels:
|
||||||
|
channel = self.reliable_channels.pop(conn_id)
|
||||||
|
channel.close()
|
||||||
print(f"连接 {conn_id} 已关闭")
|
print(f"连接 {conn_id} 已关闭")
|
||||||
|
|
||||||
|
def close_client_connections(self, client_id):
|
||||||
|
"""关闭指定客户端的连接"""
|
||||||
|
for conn_id, info in list(self.active_connections.items()):
|
||||||
|
if info['client_id'] == client_id:
|
||||||
|
self.close_connection(conn_id)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""运行服务提供端"""
|
"""运行服务提供端"""
|
||||||
# 注册服务
|
|
||||||
self.register_service()
|
self.register_service()
|
||||||
|
|
||||||
# 启动UDP监听线程
|
|
||||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
threading.Thread(target=self.udp_listener, daemon=True).start()
|
||||||
|
|
||||||
# 保持主线程运行
|
|
||||||
try:
|
try:
|
||||||
while self.running:
|
while self.running:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
self.running = False
|
self.shutdown()
|
||||||
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
|
|
||||||
for conn_id, conn_info in self.active_connections.items():
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'stop_conn',
|
|
||||||
'conn_id': conn_id
|
|
||||||
}).encode(), conn_info['client_addr'])
|
|
||||||
self.udp_sock.close()
|
|
||||||
print("服务提供端已停止")
|
|
||||||
|
|
||||||
def punch_hole(self, message):
|
def punch_hole(self, message):
|
||||||
|
"""执行打洞操作"""
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
try:
|
try:
|
||||||
self.udp_sock.sendto(json.dumps({
|
self.udp_sock.sendto(json.dumps({
|
||||||
'action': 'punch',
|
'action': 'punch',
|
||||||
'client_id': message['client_id']
|
'client_id': self.client_id
|
||||||
}).encode(), tuple(message['client_addr']))
|
}).encode(), tuple(message['client_addr']))
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"打洞失败: {str(e)}")
|
print(f"打洞失败: {str(e)}")
|
||||||
time.sleep(1)
|
|
||||||
|
def shutdown(self):
|
||||||
|
"""关闭服务提供端"""
|
||||||
|
self.running = False
|
||||||
|
# 通知协调服务器停止服务
|
||||||
|
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
|
||||||
|
# 关闭所有连接
|
||||||
|
for conn_id in list(self.active_connections.keys()):
|
||||||
|
self.close_connection(conn_id)
|
||||||
|
self.udp_sock.close()
|
||||||
|
print("服务提供端已停止")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# 配置信息
|
COORDINATOR_ADDR = ('www.awin-x.top', 5000)
|
||||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
SERVICE_NAME = "my_service"
|
||||||
SERVICE_NAME = "my_game_server"
|
INTERNAL_PORT = 22
|
||||||
INTERNAL_PORT = 5001 # 内网游戏服务器端口
|
|
||||||
|
|
||||||
provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
|
provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
|
||||||
provider.run()
|
provider.run()
|
||||||
Loading…
Reference in New Issue
Block a user