init
This commit is contained in:
parent
55c2e50c0d
commit
82b355b32b
232
connector.py
232
connector.py
@ -1,232 +0,0 @@
|
||||
import socket
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
|
||||
class ServiceConnector:
|
||||
def __init__(self, coordinator_addr, service_name, local_port):
|
||||
self.target_id = None
|
||||
self.coordinator_addr = coordinator_addr
|
||||
self.service_name = service_name
|
||||
self.local_port = local_port
|
||||
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# 创建UDP套接字用于协调通信
|
||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.udp_sock.bind(('0.0.0.0', 0))
|
||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
||||
|
||||
# 创建TCP套接字用于本地监听
|
||||
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.tcp_sock.bind(('127.0.0.1', local_port))
|
||||
self.tcp_sock.listen(5)
|
||||
print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
|
||||
|
||||
# 存储活动连接
|
||||
self.active_connections = {}
|
||||
self.provider_addr = None
|
||||
self.internal_port = None
|
||||
self.running = True
|
||||
|
||||
def request_service(self):
|
||||
"""向协调服务器请求服务"""
|
||||
message = {
|
||||
'action': 'request',
|
||||
'service_name': self.service_name,
|
||||
'client_id': self.client_id
|
||||
}
|
||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
||||
|
||||
# 等待响应
|
||||
data, _ = self.udp_sock.recvfrom(4096)
|
||||
response = json.loads(data.decode())
|
||||
if response['status'] == 'success':
|
||||
self.provider_addr = tuple(response['provider_addr'])
|
||||
self.internal_port = response['internal_port']
|
||||
self.target_id = response['target_id']
|
||||
print(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}")
|
||||
return True
|
||||
else:
|
||||
print(f"服务请求失败: {response['message']}")
|
||||
return False
|
||||
|
||||
def punch_hole(self):
|
||||
"""执行UDP打洞"""
|
||||
if not self.provider_addr:
|
||||
return False
|
||||
|
||||
# 请求打洞
|
||||
message = {
|
||||
'action': 'punch_request',
|
||||
'client_id': self.client_id,
|
||||
'target_id': self.target_id
|
||||
}
|
||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
||||
|
||||
# 等待协调服务器响应
|
||||
data, _ = self.udp_sock.recvfrom(4096)
|
||||
response = json.loads(data.decode())
|
||||
if response['status'] != 'success':
|
||||
print(f"打洞请求失败: {response['message']}")
|
||||
return False
|
||||
|
||||
# 向服务提供者发送打洞包
|
||||
print(f"尝试打洞到 {self.provider_addr}...")
|
||||
for _ in range(5):
|
||||
self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr)
|
||||
time.sleep(0.5)
|
||||
|
||||
# 检查连通性
|
||||
self.udp_sock.settimeout(10.0)
|
||||
try:
|
||||
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
|
||||
data, addr = self.udp_sock.recvfrom(1024)
|
||||
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
|
||||
print("打洞成功! 已建立UDP连接")
|
||||
return True
|
||||
else:
|
||||
print(f"错误的打洞响应{data}")
|
||||
return False
|
||||
except socket.timeout:
|
||||
print("打洞失败: 未收到响应")
|
||||
return False
|
||||
finally:
|
||||
self.udp_sock.settimeout(None)
|
||||
|
||||
def udp_listener(self):
|
||||
"""监听UDP消息"""
|
||||
while self.running:
|
||||
try:
|
||||
data, addr = self.udp_sock.recvfrom(65535)
|
||||
message = json.loads(data.decode())
|
||||
|
||||
if message['action'] == 'punch_response':
|
||||
# 打洞响应 - 确认连通性
|
||||
print(f"收到来自 {addr} 的打洞响应")
|
||||
elif message['action'] == 'data':
|
||||
if message['conn_id'] in self.active_connections:
|
||||
# 转发数据到本地客户端
|
||||
print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}")
|
||||
self.active_connections[message['conn_id']].send(bytes.fromhex(message['data']))
|
||||
else:
|
||||
print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接")
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_conn',
|
||||
'conn_id': message['conn_id']
|
||||
}).encode(), addr)
|
||||
elif message['action'] == 'stop_conn':
|
||||
# 停止连接
|
||||
if message['conn_id'] in self.active_connections:
|
||||
self.active_connections[message['conn_id']].close()
|
||||
del self.active_connections[message['conn_id']]
|
||||
print(f"已关闭本地连接 {message['conn_id']}")
|
||||
else:
|
||||
print(f"收到未知消息: {message}")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def tcp_listener(self):
|
||||
"""监听本地TCP连接"""
|
||||
while self.running:
|
||||
try:
|
||||
client_sock, client_addr = self.tcp_sock.accept()
|
||||
print(f"新的本地连接来自 {client_addr}")
|
||||
|
||||
# 为每个连接生成唯一ID
|
||||
conn_id = str(uuid.uuid4())
|
||||
|
||||
# 存储连接
|
||||
self.active_connections[conn_id] = client_sock
|
||||
|
||||
# 请求服务提供者建立连接
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'connect',
|
||||
'client_id': self.client_id,
|
||||
'conn_id': conn_id
|
||||
}).encode(), self.provider_addr)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# 启动数据转发线程
|
||||
threading.Thread(
|
||||
target=self.forward_data,
|
||||
args=(conn_id, client_sock),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
def forward_data(self, conn_id, client_sock):
|
||||
"""转发本地TCP数据到UDP隧道"""
|
||||
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
|
||||
try:
|
||||
while True:
|
||||
# 从本地客户端读取数据
|
||||
data = client_sock.recv(65535)
|
||||
if not data:
|
||||
break
|
||||
|
||||
# 通过UDP发送给服务提供者
|
||||
print(f"发送数据到服务提供者: {self.provider_addr}")
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'data',
|
||||
'conn_id': conn_id,
|
||||
'data': data.hex() # 十六进制编码二进制数据
|
||||
}).encode(), self.provider_addr)
|
||||
# 分片发送数据
|
||||
# for i in range(0, len(data), CHUNK_SIZE):
|
||||
# chunk = data[i:i + CHUNK_SIZE]
|
||||
# self.udp_sock.sendto(json.dumps({
|
||||
# 'action': 'data',
|
||||
# 'conn_id': conn_id,
|
||||
# 'data': chunk.hex()
|
||||
# }).encode(), self.provider_addr)
|
||||
except:
|
||||
pass
|
||||
finally:
|
||||
client_sock.close()
|
||||
if conn_id in self.active_connections:
|
||||
del self.active_connections[conn_id]
|
||||
|
||||
def run(self):
|
||||
"""运行服务连接端"""
|
||||
# 请求服务
|
||||
if not self.request_service():
|
||||
return
|
||||
|
||||
# 执行打洞
|
||||
if not self.punch_hole():
|
||||
return
|
||||
|
||||
# 启动UDP监听线程
|
||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
||||
|
||||
# 启动TCP监听线程
|
||||
threading.Thread(target=self.tcp_listener, daemon=True).start()
|
||||
|
||||
# 保持主线程运行
|
||||
try:
|
||||
while self.running:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
self.running = False
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_client',
|
||||
'client_id': self.client_id
|
||||
}).encode(), self.provider_addr)
|
||||
self.udp_sock.close()
|
||||
self.tcp_sock.close()
|
||||
print("服务连接端已停止")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 配置信息
|
||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
||||
SERVICE_NAME = "my_game_server"
|
||||
LOCAL_PORT = 12345 # 本地映射端口
|
||||
|
||||
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
|
||||
connector.run()
|
||||
Loading…
Reference in New Issue
Block a user