This commit is contained in:
awin-x 2025-06-08 20:24:57 +08:00
parent 0f8b92f7f8
commit 55c2e50c0d
4 changed files with 0 additions and 360 deletions

View File

@ -1,140 +0,0 @@
import socket
import json
import time
from collections import defaultdict
class CoordinatorServer:
def __init__(self, host='0.0.0.0', port=5000):
self.host = host
self.port = port
self.services = {} # 服务名 -> (公网地址, 内网端口)
self.clients = defaultdict(dict) # 客户端ID -> 信息
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind((host, port))
self.running = True
print(f"协调服务器运行在 {host}:{port}")
def handle_register(self, data, addr):
"""处理服务注册请求"""
try:
service_name = data['service_name']
internal_port = data['internal_port']
client_id = data['client_id']
# 记录客户端信息
self.clients[client_id] = {
'addr': addr,
'service_name': service_name,
'internal_port': internal_port,
'last_seen': time.time()
}
# 注册服务
self.services[service_name] = (addr, internal_port)
print(f"服务注册: {service_name} (端口:{internal_port}) 来自 {addr}")
return {'status': 'success', 'message': '服务注册成功'}
except Exception as e:
return {'status': 'error', 'message': str(e)}
def handle_request(self, data, addr):
"""处理服务请求"""
try:
service_name = data['service_name']
client_id = data['client_id']
if service_name not in self.services:
return {'status': 'error', 'message': '服务未找到'}
# 记录请求客户端信息
self.clients[client_id] = {
'addr': addr,
'service_name': service_name,
'last_seen': time.time()
}
# 获取服务提供者的信息
provider_addr, internal_port = self.services[service_name]
target_id = [
client_id
for client_id, info in self.clients.items()
if info['service_name'] == service_name
][0]
print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
return {
'status': 'success',
'provider_addr': provider_addr,
'internal_port': internal_port,
'target_id': target_id
}
except Exception as e:
return {'status': 'error', 'message': str(e)}
def handle_punch_request(self, data, addr):
"""处理打洞请求"""
try:
client_id = data['client_id']
target_id = data['target_id']
# 获取目标客户端信息
if target_id not in self.clients:
return {'status': 'error', 'message': '目标客户端未找到'}
target_addr = self.clients[target_id]['addr']
print(f"打洞请求: {addr} -> {target_addr}")
# 通知双方对方的地址
self.udp_sock.sendto(json.dumps({
'action': 'punch_request',
'client_id': client_id,
'client_addr': addr
}).encode(), target_addr)
return {
'status': 'success',
'target_addr': target_addr
}
except Exception as e:
return {'status': 'error', 'message': str(e)}
def run(self):
"""运行协调服务器"""
print("协调服务器已启动,等待连接...")
while self.running:
try:
data, addr = self.udp_sock.recvfrom(4096)
try:
message = json.loads(data.decode())
action = message.get('action')
if action == 'register':
response = self.handle_register(message, addr)
elif action == 'request':
response = self.handle_request(message, addr)
elif action == 'punch_request':
response = self.handle_punch_request(message, addr)
elif action == 'stop_provider':
self.services.pop(message['service_name'], None)
response = {'status': 'success', 'message': '服务停止成功'}
else:
response = {'status': 'error', 'message': '无效操作'}
self.udp_sock.sendto(json.dumps(response).encode(), addr)
except json.JSONDecodeError:
self.udp_sock.sendto(json.dumps({
'status': 'error',
'message': '无效的JSON数据'
}).encode(), addr)
except Exception as e:
print(f"服务器错误: {str(e)}")
self.udp_sock.close()
if __name__ == '__main__':
server = CoordinatorServer()
server.run()

0
provider.py Normal file
View File

View File

@ -1,220 +0,0 @@
import socket
import json
import threading
import time
import uuid
class ServiceProvider:
def __init__(self, coordinator_addr, service_name, internal_port):
self.coordinator_addr = coordinator_addr
self.service_name = service_name
self.internal_port = internal_port
self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
# 创建UDP套接字用于协调通信
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('0.0.0.0', 0))
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
# # 创建TCP套接字用于服务监听
# self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.tcp_sock.bind(('0.0.0.0', internal_port))
# self.tcp_sock.listen(5)
print(f"服务端监听在端口 {internal_port}")
# 存储活动连接
self.active_connections = {}
self.running = True
def register_service(self):
"""向协调服务器注册服务"""
message = {
'action': 'register',
'service_name': self.service_name,
'internal_port': self.internal_port,
'external_port': self.udp_sock.getsockname()[1],
'client_id': self.client_id
}
print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'")
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
# 等待响应
data, _ = self.udp_sock.recvfrom(4096)
response = json.loads(data.decode())
if response['status'] == 'success':
print(f"服务 '{self.service_name}' 注册成功")
else:
print(f"注册失败: {response['message']}")
def udp_listener(self):
"""监听UDP消息"""
while self.running:
try:
data, addr = self.udp_sock.recvfrom(4096)
print(f"收到来自 {addr} 的消息: {data.decode()}")
message = json.loads(data.decode())
if message.get('action') == 'punch' or message.get('action') == 'punch_check':
# 打洞请求 - 发送响应以确认连通性
self.udp_sock.sendto(json.dumps(
{'action': 'punch_response',
'client_id': message['client_id']
}).encode(), addr)
print(f"收到来自 {addr} 的打洞请求,已响应")
elif message.get('action') == 'punch_response':
# 打洞响应 - 确认打洞成功
print(f"收到来自 {addr} 的打洞响应")
elif message.get('action') == 'connect':
# 新的连接请求
conn_id = message['conn_id']
client_id = message['client_id']
print(f"收到来自 {addr} 的连接请求")
threading.Thread(
target=self.handle_connection,
args=(conn_id, addr, client_id),
daemon=True
).start()
elif message.get('action') == 'punch_request':
client_addr = message['client_addr']
print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞")
self.punch_hole(message)
elif message.get('action') == 'data':
# 接收到来自客户端的数据
conn_id = message['conn_id']
data = bytes.fromhex(message['data'])
if conn_id in self.active_connections:
# 转发数据到本地服务
print(f"收到来自 {addr} 的数据,转发到本地服务")
self.active_connections[conn_id]['local_sock'].sendall(data)
else:
print(f"收到来自 {addr} 的数据,但未找到对应的连接")
elif message.get('action') == 'stop_conn':
conn_id = message['conn_id']
if conn_id in self.active_connections:
self.active_connections[conn_id]['local_sock'].close()
self.active_connections.pop(conn_id)
elif message.get('action') == 'stop_client':
client_id = message['client_id']
for conn_id, conn_info in self.active_connections.items():
if conn_info['client_id'] == client_id:
conn_info['local_sock'].close()
self.active_connections.pop(conn_id)
else:
print(f"收到未知消息: {message}")
except Exception as e:
print(e)
pass
def handle_connection(self, conn_id, client_addr, client_id):
"""处理来自客户端的连接"""
try:
# 接受本地服务连接
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
local_sock.connect(('127.0.0.1', self.internal_port))
if not local_sock:
print("无法连接到本地服务")
return
# 创建与客户端的UDP隧道
print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}")
# 存储连接
self.active_connections[conn_id] = {
'local_sock': local_sock,
'client_addr': client_addr,
'client_id': client_id
}
# 通知客户端连接就绪
self.udp_sock.sendto(json.dumps({
'action': 'connected',
'client_id': conn_id
}).encode(), client_addr)
# 启动数据转发
self.forward_data(conn_id, local_sock, client_addr)
except Exception as e:
print(f"连接失败: {str(e)}")
self.udp_sock.sendto(json.dumps({
'action': 'connect_failed',
'client_id': conn_id,
'message': str(e)
}).encode(), client_addr)
def forward_data(self, conn_id, local_sock, client_addr):
"""转发TCP数据到UDP隧道"""
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
try:
while True:
# 从本地服务读取数据
data = local_sock.recv(65535)
if not data:
break
# 通过UDP发送给客户端
self.udp_sock.sendto(json.dumps({
'action': 'data',
'conn_id': conn_id,
'data': data.hex() # 十六进制编码二进制数据
}).encode(), client_addr)
print(f"转发数据给客户端{client_addr}")
# for i in range(0, len(data), CHUNK_SIZE):
# chunk = data[i:i + CHUNK_SIZE]
# self.udp_sock.sendto(json.dumps({
# 'action': 'data',
# 'conn_id': conn_id,
# 'data': chunk.hex()
# }).encode(), client_addr)
except Exception as e:
print(f"转发数据失败: {str(e)}")
finally:
local_sock.close()
if conn_id in self.active_connections:
del self.active_connections[conn_id]
print(f"连接 {conn_id} 已关闭")
def run(self):
"""运行服务提供端"""
# 注册服务
self.register_service()
# 启动UDP监听线程
threading.Thread(target=self.udp_listener, daemon=True).start()
# 保持主线程运行
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
self.running = False
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
for conn_id, conn_info in self.active_connections.items():
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': conn_id
}).encode(), conn_info['client_addr'])
self.udp_sock.close()
print("服务提供端已停止")
def punch_hole(self, message):
for i in range(5):
try:
self.udp_sock.sendto(json.dumps({
'action': 'punch',
'client_id': message['client_id']
}).encode(), tuple(message['client_addr']))
time.sleep(0.2)
except Exception as e:
print(f"打洞失败: {str(e)}")
time.sleep(1)
if __name__ == '__main__':
# 配置信息
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
SERVICE_NAME = "my_game_server"
INTERNAL_PORT = 5001 # 内网游戏服务器端口
provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
provider.run()