Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d50d9227bf | |||
| 82b355b32b | |||
| 55c2e50c0d |
0
connector.py
Normal file
0
connector.py
Normal file
232
connector1.py
232
connector1.py
@ -1,232 +0,0 @@
|
|||||||
import socket
|
|
||||||
import json
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
|
|
||||||
class ServiceConnector:
|
|
||||||
def __init__(self, coordinator_addr, service_name, local_port):
|
|
||||||
self.target_id = None
|
|
||||||
self.coordinator_addr = coordinator_addr
|
|
||||||
self.service_name = service_name
|
|
||||||
self.local_port = local_port
|
|
||||||
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
|
|
||||||
|
|
||||||
# 创建UDP套接字用于协调通信
|
|
||||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
self.udp_sock.bind(('0.0.0.0', 0))
|
|
||||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
|
||||||
|
|
||||||
# 创建TCP套接字用于本地监听
|
|
||||||
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
self.tcp_sock.bind(('127.0.0.1', local_port))
|
|
||||||
self.tcp_sock.listen(5)
|
|
||||||
print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
|
|
||||||
|
|
||||||
# 存储活动连接
|
|
||||||
self.active_connections = {}
|
|
||||||
self.provider_addr = None
|
|
||||||
self.internal_port = None
|
|
||||||
self.running = True
|
|
||||||
|
|
||||||
def request_service(self):
|
|
||||||
"""向协调服务器请求服务"""
|
|
||||||
message = {
|
|
||||||
'action': 'request',
|
|
||||||
'service_name': self.service_name,
|
|
||||||
'client_id': self.client_id
|
|
||||||
}
|
|
||||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
|
||||||
|
|
||||||
# 等待响应
|
|
||||||
data, _ = self.udp_sock.recvfrom(4096)
|
|
||||||
response = json.loads(data.decode())
|
|
||||||
if response['status'] == 'success':
|
|
||||||
self.provider_addr = tuple(response['provider_addr'])
|
|
||||||
self.internal_port = response['internal_port']
|
|
||||||
self.target_id = response['target_id']
|
|
||||||
print(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}")
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
print(f"服务请求失败: {response['message']}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def punch_hole(self):
|
|
||||||
"""执行UDP打洞"""
|
|
||||||
if not self.provider_addr:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# 请求打洞
|
|
||||||
message = {
|
|
||||||
'action': 'punch_request',
|
|
||||||
'client_id': self.client_id,
|
|
||||||
'target_id': self.target_id
|
|
||||||
}
|
|
||||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
|
||||||
|
|
||||||
# 等待协调服务器响应
|
|
||||||
data, _ = self.udp_sock.recvfrom(4096)
|
|
||||||
response = json.loads(data.decode())
|
|
||||||
if response['status'] != 'success':
|
|
||||||
print(f"打洞请求失败: {response['message']}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# 向服务提供者发送打洞包
|
|
||||||
print(f"尝试打洞到 {self.provider_addr}...")
|
|
||||||
for _ in range(5):
|
|
||||||
self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr)
|
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
# 检查连通性
|
|
||||||
self.udp_sock.settimeout(10.0)
|
|
||||||
try:
|
|
||||||
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
|
|
||||||
data, addr = self.udp_sock.recvfrom(1024)
|
|
||||||
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
|
|
||||||
print("打洞成功! 已建立UDP连接")
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
print(f"错误的打洞响应{data}")
|
|
||||||
return False
|
|
||||||
except socket.timeout:
|
|
||||||
print("打洞失败: 未收到响应")
|
|
||||||
return False
|
|
||||||
finally:
|
|
||||||
self.udp_sock.settimeout(None)
|
|
||||||
|
|
||||||
def udp_listener(self):
|
|
||||||
"""监听UDP消息"""
|
|
||||||
while self.running:
|
|
||||||
try:
|
|
||||||
data, addr = self.udp_sock.recvfrom(65535)
|
|
||||||
message = json.loads(data.decode())
|
|
||||||
|
|
||||||
if message['action'] == 'punch_response':
|
|
||||||
# 打洞响应 - 确认连通性
|
|
||||||
print(f"收到来自 {addr} 的打洞响应")
|
|
||||||
elif message['action'] == 'data':
|
|
||||||
if message['conn_id'] in self.active_connections:
|
|
||||||
# 转发数据到本地客户端
|
|
||||||
print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}")
|
|
||||||
self.active_connections[message['conn_id']].send(bytes.fromhex(message['data']))
|
|
||||||
else:
|
|
||||||
print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接")
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'stop_conn',
|
|
||||||
'conn_id': message['conn_id']
|
|
||||||
}).encode(), addr)
|
|
||||||
elif message['action'] == 'stop_conn':
|
|
||||||
# 停止连接
|
|
||||||
if message['conn_id'] in self.active_connections:
|
|
||||||
self.active_connections[message['conn_id']].close()
|
|
||||||
del self.active_connections[message['conn_id']]
|
|
||||||
print(f"已关闭本地连接 {message['conn_id']}")
|
|
||||||
else:
|
|
||||||
print(f"收到未知消息: {message}")
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
def tcp_listener(self):
|
|
||||||
"""监听本地TCP连接"""
|
|
||||||
while self.running:
|
|
||||||
try:
|
|
||||||
client_sock, client_addr = self.tcp_sock.accept()
|
|
||||||
print(f"新的本地连接来自 {client_addr}")
|
|
||||||
|
|
||||||
# 为每个连接生成唯一ID
|
|
||||||
conn_id = str(uuid.uuid4())
|
|
||||||
|
|
||||||
# 存储连接
|
|
||||||
self.active_connections[conn_id] = client_sock
|
|
||||||
|
|
||||||
# 请求服务提供者建立连接
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'connect',
|
|
||||||
'client_id': self.client_id,
|
|
||||||
'conn_id': conn_id
|
|
||||||
}).encode(), self.provider_addr)
|
|
||||||
|
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
# 启动数据转发线程
|
|
||||||
threading.Thread(
|
|
||||||
target=self.forward_data,
|
|
||||||
args=(conn_id, client_sock),
|
|
||||||
daemon=True
|
|
||||||
).start()
|
|
||||||
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def forward_data(self, conn_id, client_sock):
|
|
||||||
"""转发本地TCP数据到UDP隧道"""
|
|
||||||
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
# 从本地客户端读取数据
|
|
||||||
data = client_sock.recv(65535)
|
|
||||||
if not data:
|
|
||||||
break
|
|
||||||
|
|
||||||
# 通过UDP发送给服务提供者
|
|
||||||
print(f"发送数据到服务提供者: {self.provider_addr}")
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'data',
|
|
||||||
'conn_id': conn_id,
|
|
||||||
'data': data.hex() # 十六进制编码二进制数据
|
|
||||||
}).encode(), self.provider_addr)
|
|
||||||
# 分片发送数据
|
|
||||||
# for i in range(0, len(data), CHUNK_SIZE):
|
|
||||||
# chunk = data[i:i + CHUNK_SIZE]
|
|
||||||
# self.udp_sock.sendto(json.dumps({
|
|
||||||
# 'action': 'data',
|
|
||||||
# 'conn_id': conn_id,
|
|
||||||
# 'data': chunk.hex()
|
|
||||||
# }).encode(), self.provider_addr)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
client_sock.close()
|
|
||||||
if conn_id in self.active_connections:
|
|
||||||
del self.active_connections[conn_id]
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""运行服务连接端"""
|
|
||||||
# 请求服务
|
|
||||||
if not self.request_service():
|
|
||||||
return
|
|
||||||
|
|
||||||
# 执行打洞
|
|
||||||
if not self.punch_hole():
|
|
||||||
return
|
|
||||||
|
|
||||||
# 启动UDP监听线程
|
|
||||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
|
||||||
|
|
||||||
# 启动TCP监听线程
|
|
||||||
threading.Thread(target=self.tcp_listener, daemon=True).start()
|
|
||||||
|
|
||||||
# 保持主线程运行
|
|
||||||
try:
|
|
||||||
while self.running:
|
|
||||||
time.sleep(1)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
self.running = False
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'stop_client',
|
|
||||||
'client_id': self.client_id
|
|
||||||
}).encode(), self.provider_addr)
|
|
||||||
self.udp_sock.close()
|
|
||||||
self.tcp_sock.close()
|
|
||||||
print("服务连接端已停止")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
# 配置信息
|
|
||||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
|
||||||
SERVICE_NAME = "my_game_server"
|
|
||||||
LOCAL_PORT = 12345 # 本地映射端口
|
|
||||||
|
|
||||||
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
|
|
||||||
connector.run()
|
|
||||||
140
coordinator.py
140
coordinator.py
@ -1,140 +0,0 @@
|
|||||||
import socket
|
|
||||||
import json
|
|
||||||
import time
|
|
||||||
from collections import defaultdict
|
|
||||||
|
|
||||||
|
|
||||||
class CoordinatorServer:
|
|
||||||
def __init__(self, host='0.0.0.0', port=5000):
|
|
||||||
self.host = host
|
|
||||||
self.port = port
|
|
||||||
self.services = {} # 服务名 -> (公网地址, 内网端口)
|
|
||||||
self.clients = defaultdict(dict) # 客户端ID -> 信息
|
|
||||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
self.udp_sock.bind((host, port))
|
|
||||||
self.running = True
|
|
||||||
print(f"协调服务器运行在 {host}:{port}")
|
|
||||||
|
|
||||||
def handle_register(self, data, addr):
|
|
||||||
"""处理服务注册请求"""
|
|
||||||
try:
|
|
||||||
service_name = data['service_name']
|
|
||||||
internal_port = data['internal_port']
|
|
||||||
client_id = data['client_id']
|
|
||||||
|
|
||||||
# 记录客户端信息
|
|
||||||
self.clients[client_id] = {
|
|
||||||
'addr': addr,
|
|
||||||
'service_name': service_name,
|
|
||||||
'internal_port': internal_port,
|
|
||||||
'last_seen': time.time()
|
|
||||||
}
|
|
||||||
|
|
||||||
# 注册服务
|
|
||||||
self.services[service_name] = (addr, internal_port)
|
|
||||||
print(f"服务注册: {service_name} (端口:{internal_port}) 来自 {addr}")
|
|
||||||
|
|
||||||
return {'status': 'success', 'message': '服务注册成功'}
|
|
||||||
except Exception as e:
|
|
||||||
return {'status': 'error', 'message': str(e)}
|
|
||||||
|
|
||||||
def handle_request(self, data, addr):
|
|
||||||
"""处理服务请求"""
|
|
||||||
try:
|
|
||||||
service_name = data['service_name']
|
|
||||||
client_id = data['client_id']
|
|
||||||
|
|
||||||
if service_name not in self.services:
|
|
||||||
return {'status': 'error', 'message': '服务未找到'}
|
|
||||||
|
|
||||||
# 记录请求客户端信息
|
|
||||||
self.clients[client_id] = {
|
|
||||||
'addr': addr,
|
|
||||||
'service_name': service_name,
|
|
||||||
'last_seen': time.time()
|
|
||||||
}
|
|
||||||
|
|
||||||
# 获取服务提供者的信息
|
|
||||||
provider_addr, internal_port = self.services[service_name]
|
|
||||||
target_id = [
|
|
||||||
client_id
|
|
||||||
for client_id, info in self.clients.items()
|
|
||||||
if info['service_name'] == service_name
|
|
||||||
][0]
|
|
||||||
|
|
||||||
print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
|
|
||||||
|
|
||||||
return {
|
|
||||||
'status': 'success',
|
|
||||||
'provider_addr': provider_addr,
|
|
||||||
'internal_port': internal_port,
|
|
||||||
'target_id': target_id
|
|
||||||
}
|
|
||||||
except Exception as e:
|
|
||||||
return {'status': 'error', 'message': str(e)}
|
|
||||||
|
|
||||||
def handle_punch_request(self, data, addr):
|
|
||||||
"""处理打洞请求"""
|
|
||||||
try:
|
|
||||||
client_id = data['client_id']
|
|
||||||
target_id = data['target_id']
|
|
||||||
|
|
||||||
# 获取目标客户端信息
|
|
||||||
if target_id not in self.clients:
|
|
||||||
return {'status': 'error', 'message': '目标客户端未找到'}
|
|
||||||
|
|
||||||
target_addr = self.clients[target_id]['addr']
|
|
||||||
|
|
||||||
print(f"打洞请求: {addr} -> {target_addr}")
|
|
||||||
|
|
||||||
# 通知双方对方的地址
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'punch_request',
|
|
||||||
'client_id': client_id,
|
|
||||||
'client_addr': addr
|
|
||||||
}).encode(), target_addr)
|
|
||||||
|
|
||||||
return {
|
|
||||||
'status': 'success',
|
|
||||||
'target_addr': target_addr
|
|
||||||
}
|
|
||||||
except Exception as e:
|
|
||||||
return {'status': 'error', 'message': str(e)}
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""运行协调服务器"""
|
|
||||||
print("协调服务器已启动,等待连接...")
|
|
||||||
while self.running:
|
|
||||||
try:
|
|
||||||
data, addr = self.udp_sock.recvfrom(4096)
|
|
||||||
try:
|
|
||||||
message = json.loads(data.decode())
|
|
||||||
action = message.get('action')
|
|
||||||
|
|
||||||
if action == 'register':
|
|
||||||
response = self.handle_register(message, addr)
|
|
||||||
elif action == 'request':
|
|
||||||
response = self.handle_request(message, addr)
|
|
||||||
elif action == 'punch_request':
|
|
||||||
response = self.handle_punch_request(message, addr)
|
|
||||||
elif action == 'stop_provider':
|
|
||||||
self.services.pop(message['service_name'], None)
|
|
||||||
response = {'status': 'success', 'message': '服务停止成功'}
|
|
||||||
else:
|
|
||||||
response = {'status': 'error', 'message': '无效操作'}
|
|
||||||
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'status': 'error',
|
|
||||||
'message': '无效的JSON数据'
|
|
||||||
}).encode(), addr)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"服务器错误: {str(e)}")
|
|
||||||
|
|
||||||
self.udp_sock.close()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
server = CoordinatorServer()
|
|
||||||
server.run()
|
|
||||||
50
provider.py
Normal file
50
provider.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
from collections import deque
|
||||||
|
import threading
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from utiles import *
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
class ServiceConnection:
|
||||||
|
def __init__(self, conn_id: str, provider_sock: socket.socket, forward_sock: socket.socket, connector_addr: tuple):
|
||||||
|
self.conn_id = conn_id
|
||||||
|
self.provider_sock = provider_sock
|
||||||
|
self.forward_sock = forward_sock
|
||||||
|
self.connector_addr = connector_addr
|
||||||
|
self.running = False
|
||||||
|
|
||||||
|
def forward_thread(self):
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
data = self.provider_sock.recv(1024)
|
||||||
|
pack = Package('data', {'conn_id': self.conn_id}, data)
|
||||||
|
self.forward_sock.sendto(pack.pack(), self.connector_addr)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"收到提供端数据,转发失败: {e}")
|
||||||
|
break
|
||||||
|
self.running = False
|
||||||
|
if self.provider_sock:
|
||||||
|
self.provider_sock.close()
|
||||||
|
self.provider_sock = None
|
||||||
|
return
|
||||||
|
|
||||||
|
def backward_data(self, data):
|
||||||
|
try:
|
||||||
|
self.provider_sock.sendall(data)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"收到客户端数据,转发失败: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.running = True
|
||||||
|
threading.Thread(target=self.forward_thread, daemon=True).start()
|
||||||
|
|
||||||
|
|
||||||
|
class ServiceProvider:
|
||||||
|
def __init__(self, service_name: str, host: str, port: int, sock: socket.socket):
|
||||||
|
self.service_name = service_name
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.sock = sock
|
||||||
220
provider1.py
220
provider1.py
@ -1,220 +0,0 @@
|
|||||||
import socket
|
|
||||||
import json
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
|
|
||||||
class ServiceProvider:
|
|
||||||
def __init__(self, coordinator_addr, service_name, internal_port):
|
|
||||||
self.coordinator_addr = coordinator_addr
|
|
||||||
self.service_name = service_name
|
|
||||||
self.internal_port = internal_port
|
|
||||||
self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
|
|
||||||
|
|
||||||
# 创建UDP套接字用于协调通信
|
|
||||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
self.udp_sock.bind(('0.0.0.0', 0))
|
|
||||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
|
||||||
|
|
||||||
# # 创建TCP套接字用于服务监听
|
|
||||||
# self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
# self.tcp_sock.bind(('0.0.0.0', internal_port))
|
|
||||||
# self.tcp_sock.listen(5)
|
|
||||||
print(f"服务端监听在端口 {internal_port}")
|
|
||||||
|
|
||||||
# 存储活动连接
|
|
||||||
self.active_connections = {}
|
|
||||||
self.running = True
|
|
||||||
|
|
||||||
def register_service(self):
|
|
||||||
"""向协调服务器注册服务"""
|
|
||||||
message = {
|
|
||||||
'action': 'register',
|
|
||||||
'service_name': self.service_name,
|
|
||||||
'internal_port': self.internal_port,
|
|
||||||
'external_port': self.udp_sock.getsockname()[1],
|
|
||||||
'client_id': self.client_id
|
|
||||||
}
|
|
||||||
print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'")
|
|
||||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
|
||||||
|
|
||||||
# 等待响应
|
|
||||||
data, _ = self.udp_sock.recvfrom(4096)
|
|
||||||
response = json.loads(data.decode())
|
|
||||||
if response['status'] == 'success':
|
|
||||||
print(f"服务 '{self.service_name}' 注册成功")
|
|
||||||
else:
|
|
||||||
print(f"注册失败: {response['message']}")
|
|
||||||
|
|
||||||
def udp_listener(self):
|
|
||||||
"""监听UDP消息"""
|
|
||||||
while self.running:
|
|
||||||
try:
|
|
||||||
data, addr = self.udp_sock.recvfrom(4096)
|
|
||||||
print(f"收到来自 {addr} 的消息: {data.decode()}")
|
|
||||||
message = json.loads(data.decode())
|
|
||||||
|
|
||||||
if message.get('action') == 'punch' or message.get('action') == 'punch_check':
|
|
||||||
# 打洞请求 - 发送响应以确认连通性
|
|
||||||
self.udp_sock.sendto(json.dumps(
|
|
||||||
{'action': 'punch_response',
|
|
||||||
'client_id': message['client_id']
|
|
||||||
}).encode(), addr)
|
|
||||||
print(f"收到来自 {addr} 的打洞请求,已响应")
|
|
||||||
elif message.get('action') == 'punch_response':
|
|
||||||
# 打洞响应 - 确认打洞成功
|
|
||||||
print(f"收到来自 {addr} 的打洞响应")
|
|
||||||
elif message.get('action') == 'connect':
|
|
||||||
# 新的连接请求
|
|
||||||
conn_id = message['conn_id']
|
|
||||||
client_id = message['client_id']
|
|
||||||
print(f"收到来自 {addr} 的连接请求")
|
|
||||||
threading.Thread(
|
|
||||||
target=self.handle_connection,
|
|
||||||
args=(conn_id, addr, client_id),
|
|
||||||
daemon=True
|
|
||||||
).start()
|
|
||||||
elif message.get('action') == 'punch_request':
|
|
||||||
client_addr = message['client_addr']
|
|
||||||
print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞")
|
|
||||||
self.punch_hole(message)
|
|
||||||
elif message.get('action') == 'data':
|
|
||||||
# 接收到来自客户端的数据
|
|
||||||
conn_id = message['conn_id']
|
|
||||||
data = bytes.fromhex(message['data'])
|
|
||||||
if conn_id in self.active_connections:
|
|
||||||
# 转发数据到本地服务
|
|
||||||
print(f"收到来自 {addr} 的数据,转发到本地服务")
|
|
||||||
self.active_connections[conn_id]['local_sock'].sendall(data)
|
|
||||||
else:
|
|
||||||
print(f"收到来自 {addr} 的数据,但未找到对应的连接")
|
|
||||||
elif message.get('action') == 'stop_conn':
|
|
||||||
conn_id = message['conn_id']
|
|
||||||
if conn_id in self.active_connections:
|
|
||||||
self.active_connections[conn_id]['local_sock'].close()
|
|
||||||
self.active_connections.pop(conn_id)
|
|
||||||
elif message.get('action') == 'stop_client':
|
|
||||||
client_id = message['client_id']
|
|
||||||
for conn_id, conn_info in self.active_connections.items():
|
|
||||||
if conn_info['client_id'] == client_id:
|
|
||||||
conn_info['local_sock'].close()
|
|
||||||
self.active_connections.pop(conn_id)
|
|
||||||
else:
|
|
||||||
print(f"收到未知消息: {message}")
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
pass
|
|
||||||
|
|
||||||
def handle_connection(self, conn_id, client_addr, client_id):
|
|
||||||
"""处理来自客户端的连接"""
|
|
||||||
try:
|
|
||||||
# 接受本地服务连接
|
|
||||||
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
local_sock.connect(('127.0.0.1', self.internal_port))
|
|
||||||
if not local_sock:
|
|
||||||
print("无法连接到本地服务")
|
|
||||||
return
|
|
||||||
|
|
||||||
# 创建与客户端的UDP隧道
|
|
||||||
print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}")
|
|
||||||
|
|
||||||
# 存储连接
|
|
||||||
self.active_connections[conn_id] = {
|
|
||||||
'local_sock': local_sock,
|
|
||||||
'client_addr': client_addr,
|
|
||||||
'client_id': client_id
|
|
||||||
}
|
|
||||||
|
|
||||||
# 通知客户端连接就绪
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'connected',
|
|
||||||
'client_id': conn_id
|
|
||||||
}).encode(), client_addr)
|
|
||||||
|
|
||||||
# 启动数据转发
|
|
||||||
self.forward_data(conn_id, local_sock, client_addr)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"连接失败: {str(e)}")
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'connect_failed',
|
|
||||||
'client_id': conn_id,
|
|
||||||
'message': str(e)
|
|
||||||
}).encode(), client_addr)
|
|
||||||
|
|
||||||
def forward_data(self, conn_id, local_sock, client_addr):
|
|
||||||
"""转发TCP数据到UDP隧道"""
|
|
||||||
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
# 从本地服务读取数据
|
|
||||||
data = local_sock.recv(65535)
|
|
||||||
if not data:
|
|
||||||
break
|
|
||||||
|
|
||||||
# 通过UDP发送给客户端
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'data',
|
|
||||||
'conn_id': conn_id,
|
|
||||||
'data': data.hex() # 十六进制编码二进制数据
|
|
||||||
}).encode(), client_addr)
|
|
||||||
print(f"转发数据给客户端{client_addr}")
|
|
||||||
# for i in range(0, len(data), CHUNK_SIZE):
|
|
||||||
# chunk = data[i:i + CHUNK_SIZE]
|
|
||||||
# self.udp_sock.sendto(json.dumps({
|
|
||||||
# 'action': 'data',
|
|
||||||
# 'conn_id': conn_id,
|
|
||||||
# 'data': chunk.hex()
|
|
||||||
# }).encode(), client_addr)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"转发数据失败: {str(e)}")
|
|
||||||
finally:
|
|
||||||
local_sock.close()
|
|
||||||
if conn_id in self.active_connections:
|
|
||||||
del self.active_connections[conn_id]
|
|
||||||
print(f"连接 {conn_id} 已关闭")
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""运行服务提供端"""
|
|
||||||
# 注册服务
|
|
||||||
self.register_service()
|
|
||||||
|
|
||||||
# 启动UDP监听线程
|
|
||||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
|
||||||
|
|
||||||
# 保持主线程运行
|
|
||||||
try:
|
|
||||||
while self.running:
|
|
||||||
time.sleep(1)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
self.running = False
|
|
||||||
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
|
|
||||||
for conn_id, conn_info in self.active_connections.items():
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'stop_conn',
|
|
||||||
'conn_id': conn_id
|
|
||||||
}).encode(), conn_info['client_addr'])
|
|
||||||
self.udp_sock.close()
|
|
||||||
print("服务提供端已停止")
|
|
||||||
|
|
||||||
def punch_hole(self, message):
|
|
||||||
for i in range(5):
|
|
||||||
try:
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'punch',
|
|
||||||
'client_id': message['client_id']
|
|
||||||
}).encode(), tuple(message['client_addr']))
|
|
||||||
time.sleep(0.2)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"打洞失败: {str(e)}")
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
# 配置信息
|
|
||||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
|
||||||
SERVICE_NAME = "my_game_server"
|
|
||||||
INTERNAL_PORT = 5001 # 内网游戏服务器端口
|
|
||||||
|
|
||||||
provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
|
|
||||||
provider.run()
|
|
||||||
44
utiles.py
Normal file
44
utiles.py
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
import json
|
||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
|
||||||
|
|
||||||
|
class Package:
|
||||||
|
def __init__(self, action: str, message: dict = None, data: bytes = None):
|
||||||
|
self.action = action
|
||||||
|
self.message = message if message else {}
|
||||||
|
self.data = data if data else b''
|
||||||
|
|
||||||
|
def pack(self) -> bytes:
|
||||||
|
action_encode = self.action.encode('utf-8')
|
||||||
|
message_encode = json.dumps(self.message).encode('utf-8')
|
||||||
|
packed = struct.pack('>I', len(action_encode)) + action_encode
|
||||||
|
packed += struct.pack('>I', len(message_encode)) + message_encode
|
||||||
|
packed += self.data
|
||||||
|
return packed
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack(cls, packed):
|
||||||
|
action_len = struct.unpack('>I', packed[:4])[0]
|
||||||
|
action = packed[4:4 + action_len].decode('utf-8')
|
||||||
|
message_len = struct.unpack('>I', packed[4 + action_len:8 + action_len])[0]
|
||||||
|
message = json.loads(packed[8 + action_len:8 + action_len + message_len].decode('utf-8'))
|
||||||
|
data = packed[8 + action_len + message_len:]
|
||||||
|
return cls(action, message, data)
|
||||||
|
|
||||||
|
def get_data(self) -> bytes:
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
def get_message(self) -> dict:
|
||||||
|
return self.message
|
||||||
|
|
||||||
|
def get_action(self) -> str:
|
||||||
|
return self.action
|
||||||
|
|
||||||
|
def get_all(self) -> (str, dict, bytes):
|
||||||
|
return self.action, self.message, self.data
|
||||||
|
|
||||||
|
|
||||||
|
def recv_package(sock) -> (tuple, Package):
|
||||||
|
addr, pack = sock.recvfrom(1300)
|
||||||
|
return addr, Package.unpack(pack)
|
||||||
Loading…
Reference in New Issue
Block a user