201 lines
7.8 KiB
Python
201 lines
7.8 KiB
Python
import socket
|
|
import json
|
|
import threading
|
|
import time
|
|
import uuid
|
|
|
|
|
|
class ServiceProvider:
|
|
def __init__(self, coordinator_addr, service_name, internal_port):
|
|
self.coordinator_addr = coordinator_addr
|
|
self.service_name = service_name
|
|
self.internal_port = internal_port
|
|
self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
|
|
|
|
# 创建UDP套接字用于协调通信
|
|
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.udp_sock.bind(('0.0.0.0', 0))
|
|
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
|
|
|
# # 创建TCP套接字用于服务监听
|
|
# self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
# self.tcp_sock.bind(('0.0.0.0', internal_port))
|
|
# self.tcp_sock.listen(5)
|
|
print(f"服务端监听在端口 {internal_port}")
|
|
|
|
# 存储活动连接
|
|
self.active_connections = {}
|
|
self.running = True
|
|
|
|
def register_service(self):
|
|
"""向协调服务器注册服务"""
|
|
message = {
|
|
'action': 'register',
|
|
'service_name': self.service_name,
|
|
'internal_port': self.internal_port,
|
|
'external_port': self.udp_sock.getsockname()[1],
|
|
'client_id': self.client_id
|
|
}
|
|
print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'")
|
|
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
|
|
|
# 等待响应
|
|
data, _ = self.udp_sock.recvfrom(4096)
|
|
response = json.loads(data.decode())
|
|
if response['status'] == 'success':
|
|
print(f"服务 '{self.service_name}' 注册成功")
|
|
else:
|
|
print(f"注册失败: {response['message']}")
|
|
|
|
def udp_listener(self):
|
|
"""监听UDP消息"""
|
|
while self.running:
|
|
try:
|
|
data, addr = self.udp_sock.recvfrom(4096)
|
|
print(f"收到来自 {addr} 的消息: {data.decode()}")
|
|
message = json.loads(data.decode())
|
|
|
|
if message.get('action') == 'punch' or message.get('action') == 'punch_check':
|
|
# 打洞请求 - 发送响应以确认连通性
|
|
self.udp_sock.sendto(json.dumps(
|
|
{'action': 'punch_response',
|
|
'client_id': message['client_id']
|
|
}).encode(), addr)
|
|
print(f"收到来自 {addr} 的打洞请求,已响应")
|
|
elif message.get('action') == 'punch_response':
|
|
# 打洞响应 - 确认打洞成功
|
|
print(f"收到来自 {addr} 的打洞响应")
|
|
elif message.get('action') == 'connect':
|
|
# 新的连接请求
|
|
conn_id = message['conn_id']
|
|
print(f"收到来自 {addr} 的连接请求")
|
|
threading.Thread(
|
|
target=self.handle_connection,
|
|
args=(conn_id, addr),
|
|
daemon=True
|
|
).start()
|
|
elif message.get('action') == 'punch_request':
|
|
client_addr = message['client_addr']
|
|
print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞")
|
|
self.punch_hole(message)
|
|
elif message.get('action') == 'data':
|
|
# 接收到来自客户端的数据
|
|
conn_id = message['conn_id']
|
|
data = bytes.fromhex(message['data'])
|
|
if conn_id in self.active_connections:
|
|
# 转发数据到本地服务
|
|
print(f"收到来自 {addr} 的数据{data}\n,转发到本地服务")
|
|
self.active_connections[conn_id]['local_sock'].sendall(data)
|
|
else:
|
|
print(f"收到来自 {addr} 的数据,但未找到对应的连接")
|
|
else:
|
|
print(f"收到未知消息: {message}")
|
|
except Exception as e:
|
|
print(e)
|
|
pass
|
|
|
|
def handle_connection(self, conn_id, client_addr):
|
|
"""处理来自客户端的连接"""
|
|
try:
|
|
# 接受本地服务连接
|
|
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
local_sock.connect(('127.0.0.1', self.internal_port))
|
|
if not local_sock:
|
|
print("无法连接到本地服务")
|
|
return
|
|
|
|
# 创建与客户端的UDP隧道
|
|
print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}")
|
|
|
|
# 存储连接
|
|
self.active_connections[conn_id] = {
|
|
'local_sock': local_sock,
|
|
'client_addr': client_addr
|
|
}
|
|
|
|
# 通知客户端连接就绪
|
|
self.udp_sock.sendto(json.dumps({
|
|
'action': 'connected',
|
|
'client_id': conn_id
|
|
}).encode(), client_addr)
|
|
|
|
# 启动数据转发
|
|
self.forward_data(conn_id, local_sock, client_addr)
|
|
except Exception as e:
|
|
print(f"连接失败: {str(e)}")
|
|
self.udp_sock.sendto(json.dumps({
|
|
'action': 'connect_failed',
|
|
'client_id': conn_id,
|
|
'message': str(e)
|
|
}).encode(), client_addr)
|
|
|
|
def forward_data(self, conn_id, local_sock, client_addr):
|
|
"""转发TCP数据到UDP隧道"""
|
|
CHUNK_SIZE = 1472 # 根据 MTU 调整最大分片大小
|
|
try:
|
|
while True:
|
|
# 从本地服务读取数据
|
|
data = local_sock.recv(4096)
|
|
if not data:
|
|
break
|
|
|
|
# # 通过UDP发送给客户端
|
|
# self.udp_sock.sendto(json.dumps({
|
|
# 'action': 'data',
|
|
# 'conn_id': conn_id,
|
|
# 'data': data.hex() # 十六进制编码二进制数据
|
|
# }).encode(), client_addr)
|
|
for i in range(0, len(data), CHUNK_SIZE):
|
|
chunk = data[i:i + CHUNK_SIZE]
|
|
self.udp_sock.sendto(json.dumps({
|
|
'action': 'data',
|
|
'conn_id': conn_id,
|
|
'data': chunk.hex()
|
|
}).encode(), client_addr)
|
|
except Exception as e:
|
|
print(f"转发数据失败: {str(e)}")
|
|
finally:
|
|
local_sock.close()
|
|
if conn_id in self.active_connections:
|
|
del self.active_connections[conn_id]
|
|
print(f"连接 {conn_id} 已关闭")
|
|
|
|
def run(self):
|
|
"""运行服务提供端"""
|
|
# 注册服务
|
|
self.register_service()
|
|
|
|
# 启动UDP监听线程
|
|
threading.Thread(target=self.udp_listener, daemon=True).start()
|
|
|
|
# 保持主线程运行
|
|
try:
|
|
while self.running:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
self.running = False
|
|
self.udp_sock.close()
|
|
print("服务提供端已停止")
|
|
|
|
def punch_hole(self, message):
|
|
for i in range(5):
|
|
try:
|
|
self.udp_sock.sendto(json.dumps({
|
|
'action': 'punch',
|
|
'client_id': message['client_id']
|
|
}).encode(), tuple(message['client_addr']))
|
|
time.sleep(2)
|
|
except Exception as e:
|
|
print(f"打洞失败: {str(e)}")
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# 配置信息
|
|
COORDINATOR_ADDR = ('127.0.0.1', 5000) # 替换为公网服务器IP
|
|
SERVICE_NAME = "my_game_server"
|
|
INTERNAL_PORT = 8888 # 内网游戏服务器端口
|
|
|
|
provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
|
|
provider.run()
|