AwinSimpleP2P/connector.py
2025-05-31 04:32:13 +08:00

318 lines
11 KiB
Python

import socket
import json
import threading
from concurrent.futures import ThreadPoolExecutor
import time
import uuid
import logging
# 配置日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ServiceConnector:
def __init__(self, coordinator_addr, service_name, local_port):
"""
初始化服务连接器
:param coordinator_addr: 协调服务器地址 (IP, port)
:param service_name: 请求的服务名称
:param local_port: 本地监听端口
"""
self.coordinator_addr = coordinator_addr
self.service_name = service_name
self.local_port = local_port
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
# 创建UDP套接字用于协调通信
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_sock.bind(('0.0.0.0', 0))
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
# 创建TCP套接字用于本地监听
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_sock.bind(('127.0.0.1', local_port))
self.tcp_sock.listen(5)
logger.info(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
# 注册线程池
self.thread_pool = ThreadPoolExecutor(max_workers=10)
# 存储活动连接
self.active_connections = {}
self.provider_addr = None
self.provider_id = None
self.internal_port = None
self.running = True
def request_service(self):
"""
向协调服务器请求服务
:return: 请求是否成功
"""
message = {
'action': 'request',
'service_name': self.service_name,
'client_id': self.client_id
}
logger.info(f"向协调服务器 {self.coordinator_addr} 请求服务 '{self.service_name}'")
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
# 等待响应
try:
data, _ = self.udp_sock.recvfrom(4096)
response = json.loads(data.decode())
if response['status'] == 'success':
self.provider_addr = tuple(response['provider_addr'])
self.internal_port = response['internal_port']
self.provider_id = response['provider_id']
logger.info(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}")
return True
else:
logger.error(f"服务请求失败: {response['message']}")
return False
except Exception as e:
logger.error(f"请求服务时发生错误: {str(e)}")
return False
def punch_hole(self):
"""
执行UDP打洞
:return: 打洞是否成功
"""
if not self.provider_addr:
return False
# 请求协服务器发起打洞
message = {
'action': 'punch_request',
'client_id': self.client_id,
'provider_id': self.provider_id
}
logger.info(f"向协调服务器 {self.coordinator_addr} 请求打洞到 {self.provider_addr}")
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
# 等待协调服务器响应
try:
data, _ = self.udp_sock.recvfrom(4096)
response = json.loads(data.decode())
if response['status'] != 'success':
logger.error(f"打洞请求失败: {response['message']}")
return False
# 向服务提供者发送打洞包
logger.info(f"尝试打洞到 {self.provider_addr}...")
for _ in range(10):
self.udp_sock.sendto(json.dumps({
'action': 'punch',
'client_id': self.client_id
}).encode(), self.provider_addr)
time.sleep(0.2)
# 检查连通性
self.udp_sock.settimeout(10.0)
try:
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
data, addr = self.udp_sock.recvfrom(4096)
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
logger.info("打洞成功! 已建立UDP连接")
return True
else:
logger.error(f"错误的打洞响应{data}")
return False
except socket.timeout:
logger.error("打洞失败: 未收到响应")
return False
finally:
self.udp_sock.settimeout(None)
def handle_punch_response(self, message, addr):
"""
处理打洞响应
:param message: 打洞响应消息
:param addr: 服务提供者地址
"""
logger.debug(f"收到来自 {addr} 的打洞响应")
def handle_stop_conn(self, message, addr):
"""
处理停止连接请求
:param message: 停止连接请求消息
:param addr: 服务提供者地址
"""
conn_id = message['conn_id']
if conn_id in self.active_connections:
self.active_connections[conn_id].close()
self.active_connections.pop(conn_id, None)
logger.debug(f"关闭本地连接 {conn_id}")
def tcp_listener(self):
"""
监听本地TCP连接
"""
while self.running:
try:
client_sock, client_addr = self.tcp_sock.accept()
logger.debug(f"新的本地连接来自 {client_addr}")
# 为每个连接生成唯一ID
conn_id = str(uuid.uuid4())
# 存储连接
self.active_connections[conn_id] = client_sock
# 请求服务提供者建立连接
self.udp_sock.sendto(json.dumps({
'action': 'connect',
'client_id': self.client_id,
'conn_id': conn_id,
'service_name': self.service_name
}).encode(), self.provider_addr)
time.sleep(0.5)
# 启动数据转发线程
threading.Thread(
target=self.forward_data,
args=(conn_id, client_sock),
daemon=True
).start()
except Exception as e:
logger.error(f"处理本地连接时发生错误: {str(e)}")
def handel_punch(self, message, addr):
"""
处理UDP打洞请求
:param message: 打洞请求消息
:param addr: 服务提供者地址
"""
logger.debug(f"收到来自 {addr} 的UDP打洞请求")
self.udp_sock.sendto(json.dumps({
'action': 'punch_response',
'client_id': self.client_id,
'provider_id': self.provider_id
}).encode(), addr)
def handle_data(self, message, addr):
"""
处理数据消息
:param message: 数据消息
:param addr: 服务提供者地址
"""
conn_id = message['conn_id']
data = bytes.fromhex(message['data'])
if conn_id in self.active_connections:
# 转发数据到本地客户端
logger.debug(f"收到来自 {addr} 的数据,转发到本地连接 {conn_id}")
self.active_connections[conn_id].sendall(data)
else:
self.udp_sock.sendto(json.dumps({'action': 'stop_conn', 'conn_id': conn_id}).encode(), addr)
logger.debug(f"收到来自 {addr} 的数据,但未找到对应的本地连接")
def forward_data(self, conn_id, client_sock):
"""
转发本地TCP数据到UDP隧道
:param conn_id: 连接ID
:param client_sock: 本地客户端套接字
"""
try:
while True:
# 从本地客户端读取数据
data = client_sock.recv(4096)
if not data:
break
# 通过UDP发送给服务提供者
logger.debug(f"发送数据到服务提供者: {self.provider_addr}")
self.udp_sock.sendto(json.dumps({
'action': 'data',
'conn_id': conn_id,
'data': data.hex() # 十六进制编码二进制数据
}).encode(), self.provider_addr)
except Exception as e:
logger.error(f"转发数据失败: {str(e)}")
finally:
client_sock.close()
if conn_id in self.active_connections:
del self.active_connections[conn_id]
self.udp_sock.sendto(json.dumps({
'action': 'stop_conn',
'conn_id': conn_id
}).encode(), self.provider_addr)
logger.debug(f"连接 {conn_id} 已关闭")
def udp_listener(self):
"""
监听UDP消息并处理
"""
while self.running:
try:
data, addr = self.udp_sock.recvfrom(65535)
logger.debug(f"收到来自 {addr} 的消息: {data}")
message = json.loads(data.decode())
# 使用字典映射处理不同消息类型
action_handlers = {
'punch_response': self.handle_punch_response,
'data': self.handle_data,
'stop_conn': self.handle_stop_conn,
'punch': self.handel_punch
}
# 提交任务到线程池
if message.get('action') in action_handlers:
self.thread_pool.submit(action_handlers[message['action']], message, addr)
else:
logger.warning(f"收到未知消息: {message}")
except Exception as e:
logger.error(f"处理UDP消息时发生错误: {str(e)}")
def run(self):
"""
运行服务连接端
"""
# 请求服务
if not self.request_service():
logger.error("服务请求失败,退出程序")
return
# 执行打洞
if not self.punch_hole():
logger.error("打洞失败,退出程序")
return
# 启动UDP监听线程
threading.Thread(target=self.udp_listener, daemon=True).start()
# 启动TCP监听线程
threading.Thread(target=self.tcp_listener, daemon=True).start()
# 保持主线程运行
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
self.running = False
self.udp_sock.sendto(json.dumps({
'action': 'stop_client',
'client_id': self.client_id
}).encode(), self.provider_addr)
self.udp_sock.close()
self.tcp_sock.close()
logger.info("服务连接端已停止")
if __name__ == '__main__':
# 配置信息
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
SERVICE_NAME = "ssh-jk-54htrsd324n6"
# SERVICE_NAME = "terraria-jk-2cxht5"
# SERVICE_NAME = "minecraft-jk-ytsvb54u6"
# SERVICE_NAME = "alist-jk-5shf43h6fdg"
LOCAL_PORT = 12345 # 本地映射端口
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
connector.run()