tcp
This commit is contained in:
parent
67d9fdc492
commit
9c4a6af1bd
415
connector.py
415
connector.py
@ -1,317 +1,178 @@
|
|||||||
import socket
|
import socket
|
||||||
import json
|
|
||||||
import threading
|
import threading
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
import json
|
||||||
|
import hashlib
|
||||||
|
import struct
|
||||||
import time
|
import time
|
||||||
import uuid
|
|
||||||
import logging
|
|
||||||
|
|
||||||
# 配置日志
|
|
||||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class ServiceConnector:
|
class Connector:
|
||||||
def __init__(self, coordinator_addr, service_name, local_port):
|
def __init__(self, coordinator_host='127.0.0.1', coordinator_port=5000, local_port=2222):
|
||||||
"""
|
self.coordinator_host = coordinator_host
|
||||||
初始化服务连接器
|
self.coordinator_port = coordinator_port
|
||||||
:param coordinator_addr: 协调服务器地址 (IP, port)
|
|
||||||
:param service_name: 请求的服务名称
|
|
||||||
:param local_port: 本地监听端口
|
|
||||||
"""
|
|
||||||
self.coordinator_addr = coordinator_addr
|
|
||||||
self.service_name = service_name
|
|
||||||
self.local_port = local_port
|
self.local_port = local_port
|
||||||
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
|
self.token = None
|
||||||
|
self.connections = {}
|
||||||
|
self.conn_counter = 0
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
# 创建UDP套接字用于协调通信
|
def connect_to_coordinator(self):
|
||||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
self.udp_sock.bind(('0.0.0.0', 0))
|
self.coord_conn.connect((self.coordinator_host, self.coordinator_port))
|
||||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
|
||||||
|
|
||||||
# 创建TCP套接字用于本地监听
|
# Login
|
||||||
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
self._send_json({'action': 'login', 'account': 'admin'})
|
||||||
self.tcp_sock.bind(('127.0.0.1', local_port))
|
response = self._recv_json()
|
||||||
self.tcp_sock.listen(5)
|
|
||||||
logger.info(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
|
|
||||||
|
|
||||||
# 注册线程池
|
if response.get('status') == 'salt':
|
||||||
self.thread_pool = ThreadPoolExecutor(max_workers=10)
|
salt = response['salt']
|
||||||
|
password_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest()
|
||||||
|
self._send_json({'action': 'auth', 'hash': password_hash})
|
||||||
|
response = self._recv_json()
|
||||||
|
|
||||||
# 存储活动连接
|
if response.get('status') == 'success':
|
||||||
self.active_connections = {}
|
self.token = response['token']
|
||||||
self.provider_addr = None
|
print(f"Authenticated. Token: {self.token}")
|
||||||
self.provider_id = None
|
|
||||||
self.internal_port = None
|
|
||||||
self.running = True
|
|
||||||
|
|
||||||
def request_service(self):
|
|
||||||
"""
|
|
||||||
向协调服务器请求服务
|
|
||||||
:return: 请求是否成功
|
|
||||||
"""
|
|
||||||
message = {
|
|
||||||
'action': 'request',
|
|
||||||
'service_name': self.service_name,
|
|
||||||
'client_id': self.client_id
|
|
||||||
}
|
|
||||||
logger.info(f"向协调服务器 {self.coordinator_addr} 请求服务 '{self.service_name}'")
|
|
||||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
|
||||||
|
|
||||||
# 等待响应
|
|
||||||
try:
|
|
||||||
data, _ = self.udp_sock.recvfrom(4096)
|
|
||||||
response = json.loads(data.decode())
|
|
||||||
if response['status'] == 'success':
|
|
||||||
self.provider_addr = tuple(response['provider_addr'])
|
|
||||||
self.internal_port = response['internal_port']
|
|
||||||
self.provider_id = response['provider_id']
|
|
||||||
logger.info(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}")
|
|
||||||
return True
|
return True
|
||||||
else:
|
print("Connection to coordinator failed")
|
||||||
logger.error(f"服务请求失败: {response['message']}")
|
|
||||||
return False
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"请求服务时发生错误: {str(e)}")
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def punch_hole(self):
|
def request_service(self, service_name):
|
||||||
"""
|
self._send_json({
|
||||||
执行UDP打洞
|
'action': 'request_service',
|
||||||
:return: 打洞是否成功
|
'service_name': service_name,
|
||||||
"""
|
'token': self.token
|
||||||
if not self.provider_addr:
|
})
|
||||||
return False
|
response = self._recv_json()
|
||||||
|
|
||||||
# 请求协服务器发起打洞
|
if response.get('status') == 'success':
|
||||||
message = {
|
provider_addr = tuple(response['provider_addr'])
|
||||||
'action': 'punch_request',
|
print(f"Connecting to provider at {provider_addr}")
|
||||||
'client_id': self.client_id,
|
|
||||||
'provider_id': self.provider_id
|
|
||||||
}
|
|
||||||
logger.info(f"向协调服务器 {self.coordinator_addr} 请求打洞到 {self.provider_addr}")
|
|
||||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
|
||||||
|
|
||||||
# 等待协调服务器响应
|
# 使用UDP打洞
|
||||||
try:
|
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
data, _ = self.udp_sock.recvfrom(4096)
|
# 绑定到相同的本地端口(用于后续TCP连接)
|
||||||
response = json.loads(data.decode())
|
udp_socket.bind(('0.0.0.0', 0))
|
||||||
if response['status'] != 'success':
|
punch_port = udp_socket.getsockname()[1]
|
||||||
logger.error(f"打洞请求失败: {response['message']}")
|
# 向对方发送打洞包
|
||||||
return False
|
for i in range(10):
|
||||||
|
udp_socket.sendto(b'punch', provider_addr)
|
||||||
# 向服务提供者发送打洞包
|
|
||||||
logger.info(f"尝试打洞到 {self.provider_addr}...")
|
|
||||||
for _ in range(10):
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'punch',
|
|
||||||
'client_id': self.client_id
|
|
||||||
}).encode(), self.provider_addr)
|
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
|
||||||
# 检查连通性
|
# Start listening for incoming connections from provider
|
||||||
self.udp_sock.settimeout(10.0)
|
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
try:
|
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
|
listener.bind(('0.0.0.0', punch_port))
|
||||||
data, addr = self.udp_sock.recvfrom(4096)
|
listener.listen(5)
|
||||||
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
|
print(f"Listening on port {punch_port} for provider connections")
|
||||||
logger.info("打洞成功! 已建立UDP连接")
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
logger.error(f"错误的打洞响应{data}")
|
|
||||||
return False
|
|
||||||
except socket.timeout:
|
|
||||||
logger.error("打洞失败: 未收到响应")
|
|
||||||
return False
|
|
||||||
finally:
|
|
||||||
self.udp_sock.settimeout(None)
|
|
||||||
|
|
||||||
def handle_punch_response(self, message, addr):
|
# Start handler thread to accept provider's connection
|
||||||
"""
|
|
||||||
处理打洞响应
|
|
||||||
:param message: 打洞响应消息
|
|
||||||
:param addr: 服务提供者地址
|
|
||||||
"""
|
|
||||||
logger.debug(f"收到来自 {addr} 的打洞响应")
|
|
||||||
|
|
||||||
def handle_stop_conn(self, message, addr):
|
|
||||||
"""
|
|
||||||
处理停止连接请求
|
|
||||||
:param message: 停止连接请求消息
|
|
||||||
:param addr: 服务提供者地址
|
|
||||||
"""
|
|
||||||
conn_id = message['conn_id']
|
|
||||||
if conn_id in self.active_connections:
|
|
||||||
self.active_connections[conn_id].close()
|
|
||||||
self.active_connections.pop(conn_id, None)
|
|
||||||
logger.debug(f"关闭本地连接 {conn_id}")
|
|
||||||
|
|
||||||
def tcp_listener(self):
|
|
||||||
"""
|
|
||||||
监听本地TCP连接
|
|
||||||
"""
|
|
||||||
while self.running:
|
|
||||||
try:
|
|
||||||
client_sock, client_addr = self.tcp_sock.accept()
|
|
||||||
logger.debug(f"新的本地连接来自 {client_addr}")
|
|
||||||
|
|
||||||
# 为每个连接生成唯一ID
|
|
||||||
conn_id = str(uuid.uuid4())
|
|
||||||
|
|
||||||
# 存储连接
|
|
||||||
self.active_connections[conn_id] = client_sock
|
|
||||||
|
|
||||||
# 请求服务提供者建立连接
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'connect',
|
|
||||||
'client_id': self.client_id,
|
|
||||||
'conn_id': conn_id,
|
|
||||||
'service_name': self.service_name
|
|
||||||
}).encode(), self.provider_addr)
|
|
||||||
|
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
# 启动数据转发线程
|
|
||||||
threading.Thread(
|
threading.Thread(
|
||||||
target=self.forward_data,
|
target=self.handle_provider_connection,
|
||||||
args=(conn_id, client_sock),
|
args=(listener, service_name),
|
||||||
|
daemon=True
|
||||||
|
).start()
|
||||||
|
return True
|
||||||
|
print("Failed to request service")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def handle_provider_connection(self, listener, service_name):
|
||||||
|
# Accept connection from provider
|
||||||
|
try:
|
||||||
|
provider_sock, addr = listener.accept()
|
||||||
|
print(f"Accepted provider connection from {addr}")
|
||||||
|
# Start heartbeat monitoring
|
||||||
|
threading.Thread(
|
||||||
|
target=self.monitor_heartbeats,
|
||||||
|
args=(provider_sock,),
|
||||||
daemon=True
|
daemon=True
|
||||||
).start()
|
).start()
|
||||||
|
|
||||||
except Exception as e:
|
# Start client listener to accept local clients
|
||||||
logger.error(f"处理本地连接时发生错误: {str(e)}")
|
client_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
client_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
client_listener.bind(('0.0.0.0', self.local_port))
|
||||||
|
client_listener.listen(5)
|
||||||
|
print(f"Client listener started on port {self.local_port}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
client_sock, addr = client_listener.accept()
|
||||||
|
print(f"New client from {addr}")
|
||||||
|
with self.lock:
|
||||||
|
conn_id = self.conn_counter
|
||||||
|
self.conn_counter += 1
|
||||||
|
|
||||||
|
threading.Thread(
|
||||||
|
target=self.handle_client_connection,
|
||||||
|
args=(client_sock, provider_sock, conn_id),
|
||||||
|
daemon=True
|
||||||
|
).start()
|
||||||
|
finally:
|
||||||
|
client_listener.close()
|
||||||
|
finally:
|
||||||
|
listener.close()
|
||||||
|
|
||||||
|
def handle_client_connection(self, client_sock, provider_sock, conn_id):
|
||||||
|
self.connections[conn_id] = client_sock
|
||||||
|
|
||||||
def handel_punch(self, message, addr):
|
|
||||||
"""
|
|
||||||
处理UDP打洞请求
|
|
||||||
:param message: 打洞请求消息
|
|
||||||
:param addr: 服务提供者地址
|
|
||||||
"""
|
|
||||||
logger.debug(f"收到来自 {addr} 的UDP打洞请求")
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'punch_response',
|
|
||||||
'client_id': self.client_id,
|
|
||||||
'provider_id': self.provider_id
|
|
||||||
}).encode(), addr)
|
|
||||||
|
|
||||||
|
|
||||||
def handle_data(self, message, addr):
|
|
||||||
"""
|
|
||||||
处理数据消息
|
|
||||||
:param message: 数据消息
|
|
||||||
:param addr: 服务提供者地址
|
|
||||||
"""
|
|
||||||
conn_id = message['conn_id']
|
|
||||||
data = bytes.fromhex(message['data'])
|
|
||||||
if conn_id in self.active_connections:
|
|
||||||
# 转发数据到本地客户端
|
|
||||||
logger.debug(f"收到来自 {addr} 的数据,转发到本地连接 {conn_id}")
|
|
||||||
self.active_connections[conn_id].sendall(data)
|
|
||||||
else:
|
|
||||||
self.udp_sock.sendto(json.dumps({'action': 'stop_conn', 'conn_id': conn_id}).encode(), addr)
|
|
||||||
logger.debug(f"收到来自 {addr} 的数据,但未找到对应的本地连接")
|
|
||||||
|
|
||||||
def forward_data(self, conn_id, client_sock):
|
|
||||||
"""
|
|
||||||
转发本地TCP数据到UDP隧道
|
|
||||||
:param conn_id: 连接ID
|
|
||||||
:param client_sock: 本地客户端套接字
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
# 从本地客户端读取数据
|
|
||||||
data = client_sock.recv(4096)
|
data = client_sock.recv(4096)
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
|
header = struct.pack("!I B", conn_id, len(data))
|
||||||
# 通过UDP发送给服务提供者
|
provider_sock.sendall(header + data)
|
||||||
logger.debug(f"发送数据到服务提供者: {self.provider_addr}")
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'data',
|
|
||||||
'conn_id': conn_id,
|
|
||||||
'data': data.hex() # 十六进制编码二进制数据
|
|
||||||
}).encode(), self.provider_addr)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"转发数据失败: {str(e)}")
|
|
||||||
finally:
|
finally:
|
||||||
client_sock.close()
|
client_sock.close()
|
||||||
if conn_id in self.active_connections:
|
with self.lock:
|
||||||
del self.active_connections[conn_id]
|
if conn_id in self.connections:
|
||||||
self.udp_sock.sendto(json.dumps({
|
del self.connections[conn_id]
|
||||||
'action': 'stop_conn',
|
|
||||||
'conn_id': conn_id
|
|
||||||
}).encode(), self.provider_addr)
|
|
||||||
logger.debug(f"连接 {conn_id} 已关闭")
|
|
||||||
|
|
||||||
def udp_listener(self):
|
def monitor_heartbeats(self, sock):
|
||||||
"""
|
last_heartbeat = time.time()
|
||||||
监听UDP消息并处理
|
while True:
|
||||||
"""
|
|
||||||
while self.running:
|
|
||||||
try:
|
try:
|
||||||
data, addr = self.udp_sock.recvfrom(65535)
|
header = sock.recv(5)
|
||||||
logger.debug(f"收到来自 {addr} 的消息: {data}")
|
if not header:
|
||||||
message = json.loads(data.decode())
|
break
|
||||||
|
|
||||||
# 使用字典映射处理不同消息类型
|
conn_id, data_len = struct.unpack("!I B", header)
|
||||||
action_handlers = {
|
data = sock.recv(data_len) if data_len > 0 else b''
|
||||||
'punch_response': self.handle_punch_response,
|
|
||||||
'data': self.handle_data,
|
|
||||||
'stop_conn': self.handle_stop_conn,
|
|
||||||
'punch': self.handel_punch
|
|
||||||
}
|
|
||||||
|
|
||||||
# 提交任务到线程池
|
# Check if heartbeat
|
||||||
if message.get('action') in action_handlers:
|
if conn_id == 0 and data_len == 0:
|
||||||
self.thread_pool.submit(action_handlers[message['action']], message, addr)
|
last_heartbeat = time.time()
|
||||||
else:
|
continue
|
||||||
logger.warning(f"收到未知消息: {message}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"处理UDP消息时发生错误: {str(e)}")
|
|
||||||
|
|
||||||
def run(self):
|
# Forward data to client
|
||||||
"""
|
with self.lock:
|
||||||
运行服务连接端
|
if conn_id in self.connections:
|
||||||
"""
|
self.connections[conn_id].sendall(data)
|
||||||
# 请求服务
|
except ConnectionResetError:
|
||||||
if not self.request_service():
|
break
|
||||||
logger.error("服务请求失败,退出程序")
|
|
||||||
|
# Check heartbeat timeout
|
||||||
|
if time.time() - last_heartbeat > 10:
|
||||||
|
print("Heartbeat timeout")
|
||||||
|
break
|
||||||
|
|
||||||
|
def start(self, service_name='ssh'):
|
||||||
|
if not self.connect_to_coordinator():
|
||||||
return
|
return
|
||||||
|
|
||||||
# 执行打洞
|
if self.request_service(service_name):
|
||||||
if not self.punch_hole():
|
while True:
|
||||||
logger.error("打洞失败,退出程序")
|
|
||||||
return
|
|
||||||
|
|
||||||
# 启动UDP监听线程
|
|
||||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
|
||||||
|
|
||||||
# 启动TCP监听线程
|
|
||||||
threading.Thread(target=self.tcp_listener, daemon=True).start()
|
|
||||||
|
|
||||||
# 保持主线程运行
|
|
||||||
try:
|
|
||||||
while self.running:
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
except KeyboardInterrupt:
|
|
||||||
self.running = False
|
def _send_json(self, data):
|
||||||
self.udp_sock.sendto(json.dumps({
|
self.coord_conn.sendall(json.dumps(data).encode())
|
||||||
'action': 'stop_client',
|
|
||||||
'client_id': self.client_id
|
def _recv_json(self):
|
||||||
}).encode(), self.provider_addr)
|
data = self.coord_conn.recv(4096)
|
||||||
self.udp_sock.close()
|
return json.loads(data.decode()) if data else None
|
||||||
self.tcp_sock.close()
|
|
||||||
logger.info("服务连接端已停止")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
# 配置信息
|
connector = Connector(coordinator_host='www.awin-x.top',local_port=2222)
|
||||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
connector.start(service_name='ssh')
|
||||||
SERVICE_NAME = "ssh-jk-54htrsd324n6"
|
|
||||||
# SERVICE_NAME = "terraria-jk-2cxht5"
|
|
||||||
# SERVICE_NAME = "minecraft-jk-ytsvb54u6"
|
|
||||||
# SERVICE_NAME = "alist-jk-5shf43h6fdg"
|
|
||||||
LOCAL_PORT = 12345 # 本地映射端口
|
|
||||||
|
|
||||||
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
|
|
||||||
connector.run()
|
|
||||||
387
coordinator.py
387
coordinator.py
@ -1,259 +1,178 @@
|
|||||||
import socket
|
import socket
|
||||||
import json
|
|
||||||
import time
|
|
||||||
import logging
|
|
||||||
from collections import defaultdict
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
import threading
|
import threading
|
||||||
|
import json
|
||||||
# 设置日志配置
|
import os
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
import hashlib
|
||||||
logger = logging.getLogger(__name__)
|
import secrets
|
||||||
|
import time
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
|
||||||
class CoordinatorServer:
|
class Coordinator:
|
||||||
def __init__(self, host='0.0.0.0', port=5000):
|
def __init__(self, host='0.0.0.0', port=5000):
|
||||||
"""
|
# 初始化协调器服务端参数
|
||||||
初始化协调服务器。
|
self.host = host # 监听地址
|
||||||
:param host: 服务器绑定的主机地址,默认为 '0.0.0.0'。
|
self.port = port # 监听端口
|
||||||
:param port: 服务器监听的端口号,默认为 5000。
|
# 生成盐值用于密码加密
|
||||||
"""
|
self.salt = secrets.token_hex(8)
|
||||||
self.host = host
|
# 存储管理员密码哈希值(盐+密码)
|
||||||
self.port = port
|
self.stored_hash = hashlib.sha256((self.salt + "admin_password").encode()).hexdigest()
|
||||||
self.clients = defaultdict(dict) # 客户端信息字典
|
# 存储用户令牌信息
|
||||||
self.providers = defaultdict(dict) # 服务提供端信息字典
|
self.tokens = {}
|
||||||
self.services = defaultdict(tuple) # 服务名称与提供者信息的映射
|
# 存储服务注册信息,格式:{token: {services: [], addr: (), conn: socket}}
|
||||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
self.services = defaultdict(dict)
|
||||||
self.udp_sock.bind((host, port))
|
# 活动连接池(当前未使用)
|
||||||
self.udp_sock.settimeout(1) # 设置 UDP 套接字接收超时时间为 1 秒
|
self.active_connections = {}
|
||||||
self.running = True
|
# 线程锁保证数据安全
|
||||||
self.executor = ThreadPoolExecutor(max_workers=10) # 创建线程池
|
self.lock = threading.Lock()
|
||||||
logger.debug(f"协调服务器运行在 {host}:{port}")
|
|
||||||
|
|
||||||
# 启动定时清理任务
|
def handle_client(self, conn, addr):
|
||||||
self.executor.submit(self.cleanup_expired_services)
|
# 处理客户端连接
|
||||||
|
print(f"New connection from {addr}")
|
||||||
|
token = None
|
||||||
|
salt = secrets.token_hex(8)
|
||||||
|
stored_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest()
|
||||||
|
|
||||||
def handle_register(self, data, addr):
|
|
||||||
"""
|
|
||||||
处理服务注册请求。
|
|
||||||
:param data: 包含服务注册信息的字典。
|
|
||||||
:param addr: 发送请求的客户端地址。
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
provider_id = data['provider_id']
|
while True:
|
||||||
services = data['services']
|
# 接收客户端JSON数据
|
||||||
|
data = self.recv_json(conn)
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
|
||||||
# 记录客户端信息
|
action = data.get('action')
|
||||||
self.providers[provider_id] = {
|
|
||||||
'addr': addr,
|
# 登录流程:发送盐值
|
||||||
'services': services,
|
if action == 'login':
|
||||||
'last_seen': time.time()
|
if data.get('account') == 'admin':
|
||||||
|
response = {'status': 'salt', 'salt': salt}
|
||||||
|
self.send_json(conn, response)
|
||||||
|
else:
|
||||||
|
self.send_json(conn, {'status': 'error', 'message': 'Invalid account'})
|
||||||
|
|
||||||
|
# 认证流程:验证密码哈希
|
||||||
|
elif action == 'auth':
|
||||||
|
if data.get('hash') == stored_hash:
|
||||||
|
# 生成访问令牌(有效期1小时)
|
||||||
|
token = secrets.token_hex(8)
|
||||||
|
with self.lock:
|
||||||
|
self.tokens[token] = {
|
||||||
|
'ip': addr[0],
|
||||||
|
'expiry': time.time() + 3600 # 令牌过期时间
|
||||||
}
|
}
|
||||||
|
response = {'status': 'success', 'token': token, 'message': 'Login successful'}
|
||||||
|
self.send_json(conn, response)
|
||||||
|
else:
|
||||||
|
self.send_json(conn, {'status': 'error', 'message': 'Authentication failed'})
|
||||||
|
|
||||||
# 遍历 services 字典,记录每个服务的名称和端口
|
# 服务注册流程
|
||||||
for service_name, service_port in services.items():
|
elif action == 'register_service':
|
||||||
self.services[service_name] = (addr, service_port, provider_id)
|
client_token = data.get('token')
|
||||||
|
if self.validate_token(client_token, addr[0]):
|
||||||
logger.info(f"注册来自{addr}:{services}")
|
services = data.get('services', [])
|
||||||
|
with self.lock:
|
||||||
# 直接回复注册成功消息
|
self.services[client_token] = {
|
||||||
response = {'status': 'success', 'message': '服务注册成功'}
|
'services': services, # 支持的服务列表
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
'addr': addr, # 客户端地址信息
|
||||||
except Exception as e:
|
'conn': conn # 客户端连接套接字
|
||||||
response = {'status': 'error', 'message': str(e)}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
|
|
||||||
def handle_request(self, data, addr):
|
|
||||||
"""
|
|
||||||
处理服务请求。
|
|
||||||
:param data: 包含服务请求信息的字典。
|
|
||||||
:param addr: 发送请求的客户端地址。
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
service_name = data['service_name']
|
|
||||||
client_id = data['client_id']
|
|
||||||
|
|
||||||
if service_name not in self.services:
|
|
||||||
response = {'status': 'error', 'message': '服务未找到'}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
|
|
||||||
# 记录请求客户端信息
|
|
||||||
self.clients[client_id] = {
|
|
||||||
'addr': addr,
|
|
||||||
'service_name': service_name,
|
|
||||||
'last_seen': time.time()
|
|
||||||
}
|
}
|
||||||
|
self.send_json(conn, {'status': 'success', 'message': 'Services registered'})
|
||||||
|
else:
|
||||||
|
self.send_json(conn, {'status': 'error', 'message': 'Invalid token'})
|
||||||
|
|
||||||
# 获取服务提供者的信息
|
# 服务请求流程
|
||||||
provider_addr, internal_port, provider_id = self.services[service_name]
|
elif action == 'request_service':
|
||||||
|
client_token = data.get('token')
|
||||||
|
if not self.validate_token(client_token, addr[0]):
|
||||||
|
self.send_json(conn, {'status': 'error', 'message': 'Invalid token'})
|
||||||
|
continue
|
||||||
|
|
||||||
logger.debug(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
|
service_name = data.get('service_name')
|
||||||
|
provider_token = self.find_service_provider(service_name)
|
||||||
|
|
||||||
response = {
|
if provider_token:
|
||||||
'status': 'success',
|
provider_info = self.services[provider_token]
|
||||||
'provider_addr': provider_addr,
|
provider_addr = provider_info['addr']
|
||||||
'internal_port': internal_port,
|
connector_addr = addr
|
||||||
'provider_id': provider_id
|
|
||||||
}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
except Exception as e:
|
|
||||||
response = {'status': 'error', 'message': str(e)}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
|
|
||||||
def handle_punch_request(self, data, addr):
|
# 通知服务提供方进行NAT打洞
|
||||||
"""
|
punch_msg = {
|
||||||
处理打洞请求。
|
|
||||||
:param data: 包含打洞请求信息的字典。
|
|
||||||
:param addr: 发送请求的客户端地址。
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
client_id = data['client_id']
|
|
||||||
provider_id = data['provider_id']
|
|
||||||
|
|
||||||
# 获取目标客户端信息
|
|
||||||
if provider_id not in self.providers:
|
|
||||||
response = {'status': 'error', 'message': '目标提供端未找到'}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
|
|
||||||
provider_addr = self.providers[provider_id]['addr']
|
|
||||||
|
|
||||||
logger.info(f"打洞请求: {addr} -> {provider_id}")
|
|
||||||
|
|
||||||
# 通知双方对方的地址
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'punch_request',
|
'action': 'punch_request',
|
||||||
'client_id': client_id,
|
'connector_addr': connector_addr, # 请求方地址
|
||||||
'client_addr': addr
|
'service_name': service_name # 请求的服务名称
|
||||||
}).encode(), provider_addr)
|
|
||||||
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'status': 'success',
|
|
||||||
'provider_addr': provider_addr
|
|
||||||
}).encode(), addr)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"处理打洞请求时出错: {e}")
|
|
||||||
response = {'status': 'error', 'message': str(e)}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
|
|
||||||
def handle_stop_provider(self, data, addr):
|
|
||||||
"""
|
|
||||||
处理停止服务请求。
|
|
||||||
:param data: 包含停止服务信息的字典。
|
|
||||||
:param addr: 发送请求的客户端地址。
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
service_name = data['service_name']
|
|
||||||
self.services.pop(service_name, None)
|
|
||||||
response = {'status': 'success', 'message': '服务停止成功'}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
except Exception as e:
|
|
||||||
response = {'status': 'error', 'message': str(e)}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
|
|
||||||
def handle_heartbeat(self, data, addr):
|
|
||||||
"""
|
|
||||||
处理心跳包。
|
|
||||||
:param data: 包含心跳信息的字典。
|
|
||||||
:param addr: 发送心跳的客户端地址。
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
provider_id = data['provider_id']
|
|
||||||
self.providers[provider_id]['last_seen'] = time.time()
|
|
||||||
response = {'status': 'success', 'message': '心跳更新成功'}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
except Exception as e:
|
|
||||||
response = {'status': 'error', 'message': str(e)}
|
|
||||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
|
||||||
|
|
||||||
def cleanup_expired_services(self):
|
|
||||||
"""
|
|
||||||
定时清理过期服务。
|
|
||||||
每20秒检查一次,移除超过30秒未更新心跳的服务。
|
|
||||||
"""
|
|
||||||
while self.running:
|
|
||||||
time.sleep(60) # 每60秒检查一次
|
|
||||||
current_time = time.time()
|
|
||||||
expired_providers = []
|
|
||||||
for provider_id, provider in self.providers.items():
|
|
||||||
if current_time - provider['last_seen'] > 60: # 心跳包最后更新时间大于30秒
|
|
||||||
expired_providers.append(provider_id)
|
|
||||||
logger.info(f"服务过期: {provider['addr']}")
|
|
||||||
for provider_id in expired_providers:
|
|
||||||
provider = self.providers[provider_id]
|
|
||||||
for service_name in provider['services'].keys(): # 使用新的 services 字段
|
|
||||||
self.services.pop(service_name, None)
|
|
||||||
self.providers.pop(provider_id, None)
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""
|
|
||||||
运行协调服务器。
|
|
||||||
"""
|
|
||||||
logger.info(f"协调服务器已启动,端口{self.udp_sock.getsockname()[1]},等待连接...")
|
|
||||||
action_handlers = {
|
|
||||||
'register': self.handle_register, # 服务注册处理行为
|
|
||||||
'request': self.handle_request, # 服务请求处理行为
|
|
||||||
'punch_request': self.handle_punch_request, # 打洞处理行为
|
|
||||||
'stop_provider': self.handle_stop_provider, # 停止服务行为
|
|
||||||
'heartbeat': self.handle_heartbeat # 心跳处理行为
|
|
||||||
}
|
}
|
||||||
|
self.send_json(provider_info['conn'], punch_msg)
|
||||||
|
|
||||||
while self.running:
|
# 响应请求方
|
||||||
try:
|
self.send_json(conn, {
|
||||||
data, addr = self.udp_sock.recvfrom(4096)
|
'status': 'success',
|
||||||
try:
|
'provider_addr': provider_addr # 提供方地址信息
|
||||||
message = json.loads(data.decode())
|
})
|
||||||
action = message.get('action')
|
|
||||||
|
|
||||||
handler = action_handlers.get(action)
|
# 使用后立即销毁令牌
|
||||||
if handler: # 如果存在对应的处理行为,则执行它
|
with self.lock:
|
||||||
self.executor.submit(handler, message, addr).result()
|
if client_token in self.tokens:
|
||||||
else: # 如果没有对应的处理行为,则返回错误响应
|
del self.tokens[client_token]
|
||||||
self.udp_sock.sendto(json.dumps({
|
else:
|
||||||
'status': 'error',
|
self.send_json(conn, {'status': 'error', 'message': 'Service not available'})
|
||||||
'message': '无效操作'}).encode(), addr)
|
except (ConnectionResetError, json.JSONDecodeError):
|
||||||
except json.JSONDecodeError:
|
pass
|
||||||
self.udp_sock.sendto(json.dumps({
|
finally:
|
||||||
'status': 'error',
|
conn.close()
|
||||||
'message': '无效的JSON数据'
|
print(f"Connection closed: {addr}")
|
||||||
}).encode(), addr)
|
# 清理资源
|
||||||
except socket.timeout: # 捕获 UDP 接收超时异常
|
if token:
|
||||||
pass # 不做任何处理,允许主线程继续执行
|
with self.lock:
|
||||||
except Exception as e:
|
if token in self.tokens:
|
||||||
logger.debug(f"服务器错误: {str(e)}")
|
del self.tokens[token]
|
||||||
|
if token in self.services:
|
||||||
|
del self.services[token]
|
||||||
|
|
||||||
|
def validate_token(self, token, ip):
|
||||||
|
# 验证令牌有效性:存在、IP匹配、未过期
|
||||||
|
with self.lock:
|
||||||
|
token_info = self.tokens.get(token)
|
||||||
|
if token_info and token_info['ip'] == ip and token_info['expiry'] > time.time():
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def find_service_provider(self, service_name):
|
||||||
|
# 查找可用服务提供者
|
||||||
|
for token, info in self.services.items():
|
||||||
|
if service_name in info['services']:
|
||||||
|
return token
|
||||||
|
return None
|
||||||
|
|
||||||
|
def recv_json(self, conn):
|
||||||
|
# 接收并解析JSON数据
|
||||||
|
data = conn.recv(4096)
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
return json.loads(data.decode())
|
||||||
|
|
||||||
|
def send_json(self, conn, data):
|
||||||
|
# 发送JSON数据
|
||||||
|
conn.sendall(json.dumps(data).encode())
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""
|
# 启动协调器服务
|
||||||
启动协调服务器。
|
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
"""
|
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
# 创建线程运行 run 方法
|
server.bind((self.host, self.port))
|
||||||
server_thread = threading.Thread(target=self.run)
|
server.listen(5)
|
||||||
server_thread.start()
|
print(f"Coordinator listening on {self.host}:{self.port}")
|
||||||
|
|
||||||
try:
|
while True:
|
||||||
# 主线程捕获键盘打断信号
|
conn, addr = server.accept()
|
||||||
while self.running:
|
# 为每个连接创建独立线程
|
||||||
time.sleep(1) # 防止主线程空转
|
client_thread = threading.Thread(
|
||||||
except KeyboardInterrupt:
|
target=self.handle_client,
|
||||||
logger.info("检测到键盘打断,准备退出...")
|
args=(conn, addr),
|
||||||
self.running = False
|
daemon=True
|
||||||
|
)
|
||||||
# 通知所有提供端停止服务
|
client_thread.start()
|
||||||
for provider_id, provider_info in self.providers.items():
|
|
||||||
try:
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'stop_provider',
|
|
||||||
}).encode(), provider_info['addr'])
|
|
||||||
logger.info(f"已通知提供端 {provider_id} 停止服务")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"通知提供端 {provider_id} 停止服务时出错: {e}")
|
|
||||||
server_thread.join() # 等待服务器线程退出
|
|
||||||
|
|
||||||
# 关闭线程池和套接字
|
|
||||||
self.executor.shutdown()
|
|
||||||
self.udp_sock.close()
|
|
||||||
logger.info("协调服务器已安全退出")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
server = CoordinatorServer()
|
coordinator = Coordinator()
|
||||||
server.start()
|
coordinator.start()
|
||||||
467
provider.py
467
provider.py
@ -1,348 +1,185 @@
|
|||||||
import socket
|
import socket
|
||||||
import json
|
|
||||||
import threading
|
import threading
|
||||||
|
import json
|
||||||
|
import hashlib
|
||||||
import time
|
import time
|
||||||
import uuid
|
import struct
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
import logging
|
|
||||||
|
|
||||||
# 配置日志
|
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class ServiceProvider:
|
# 定义 Provider 类,用于处理与协调器的连接和P2P通信
|
||||||
def __init__(self, coordinator_addr, services):
|
class Provider:
|
||||||
"""
|
def __init__(self, coordinator_host='127.0.0.1', coordinator_port=5000):
|
||||||
初始化服务提供者
|
# 初始化协调器的主机和端口
|
||||||
:param coordinator_addr: 协调服务器地址 (IP, port)
|
self.coordinator_host = coordinator_host
|
||||||
:param services: 提供的服务列表 {服务名: 端口号}
|
self.coordinator_port = coordinator_port
|
||||||
"""
|
# 用于存储认证令牌
|
||||||
self.provider_id = f"provider-{uuid.uuid4().hex[:8]}"
|
self.token = None
|
||||||
self.coordinator_addr = coordinator_addr
|
# 定义可提供的服务及其默认端口
|
||||||
self.services = services
|
self.service_ports = {'ssh': 22, 'alist': 5244, 'minecraft': 25565}
|
||||||
|
# 存储连接的客户端
|
||||||
|
self.connections = {}
|
||||||
|
# 用于线程安全操作的锁
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
# 创建UDP套接字用于协调通信
|
def connect_to_coordinator(self):
|
||||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
# 创建与协调器的TCP连接
|
||||||
self.udp_sock.bind(('0.0.0.0', 0))
|
self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
self.coord_conn.connect((self.coordinator_host, self.coordinator_port))
|
||||||
|
|
||||||
# 创建线程池用于处理UDP消息
|
# 发送登录请求
|
||||||
self.thread_pool = ThreadPoolExecutor(max_workers=10)
|
self._send_json({'action': 'login', 'account': 'admin'})
|
||||||
|
response = self._recv_json()
|
||||||
|
|
||||||
self.clients = {}
|
# 处理协调器返回的盐值并进行密码哈希验证
|
||||||
# 存储活动连接
|
if response.get('status') == 'salt':
|
||||||
self.active_connections = {}
|
salt = response['salt']
|
||||||
self.running = True
|
password_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest()
|
||||||
|
self._send_json({'action': 'auth', 'hash': password_hash})
|
||||||
|
response = self._recv_json()
|
||||||
|
|
||||||
# 心跳线程
|
# 如果认证成功,存储令牌并注册服务
|
||||||
self.heartbeat_thread = threading.Thread(target=self.send_heartbeat, daemon=True)
|
if response.get('status') == 'success':
|
||||||
|
self.token = response['token']
|
||||||
|
print(f"Authenticated. Token: {self.token}")
|
||||||
|
|
||||||
def send_heartbeat(self):
|
self._send_json({
|
||||||
"""
|
'action': 'register_service',
|
||||||
发送心跳包到协调服务器
|
'services': list(self.service_ports.keys()),
|
||||||
"""
|
'token': self.token
|
||||||
while self.running:
|
})
|
||||||
try:
|
response = self._recv_json()
|
||||||
message = {
|
if response.get('status') == 'success':
|
||||||
'action': 'heartbeat',
|
print("Services registered")
|
||||||
'provider_id': self.provider_id
|
|
||||||
}
|
|
||||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
|
||||||
logger.debug(f"发送心跳包给协调服务器 {self.coordinator_addr}")
|
|
||||||
time.sleep(20) # 每20秒发送一次心跳包
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"发送心跳包失败: {str(e)}")
|
|
||||||
|
|
||||||
def register_service(self):
|
|
||||||
"""
|
|
||||||
向协调服务器注册服务
|
|
||||||
:return: 注册是否成功
|
|
||||||
"""
|
|
||||||
message = {
|
|
||||||
'action': 'register',
|
|
||||||
'services': self.services,
|
|
||||||
'provider_id': self.provider_id
|
|
||||||
}
|
|
||||||
logger.info(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.services}'")
|
|
||||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
|
||||||
|
|
||||||
# 等待响应
|
|
||||||
try:
|
|
||||||
data, _ = self.udp_sock.recvfrom(4096)
|
|
||||||
response = json.loads(data.decode())
|
|
||||||
if response['status'] == 'success':
|
|
||||||
logger.info(f"服务 '{self.services}' 注册成功")
|
|
||||||
return True
|
return True
|
||||||
else:
|
print("Connection to coordinator failed")
|
||||||
logger.error(f"注册失败: {response['message']}")
|
|
||||||
return False
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"注册服务时发生错误: {str(e)}")
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def handle_punch(self, message, addr):
|
def handle_punch_request(self, data):
|
||||||
"""
|
connector_addr = tuple(data['connector_addr'])
|
||||||
处理打洞请求
|
service_name = data['service_name']
|
||||||
:param message: 打洞请求消息
|
print(f"Punching hole to connector at {connector_addr}, waiting 10 seconds...")
|
||||||
:param addr: 客户端地址
|
|
||||||
"""
|
|
||||||
self.udp_sock.sendto(json.dumps(
|
|
||||||
{'action': 'punch_response',
|
|
||||||
'client_id': message['client_id'],
|
|
||||||
'provider_id': self.provider_id
|
|
||||||
}).encode(), addr)
|
|
||||||
logger.debug(f"收到来自 {addr} 的打洞请求,已响应")
|
|
||||||
|
|
||||||
def handle_punch_response(self, _, addr):
|
# Wait for 10 seconds to allow the connector to initiate its punch
|
||||||
"""
|
time.sleep(2)
|
||||||
处理打洞响应
|
|
||||||
:param addr: 客户端地址
|
|
||||||
"""
|
|
||||||
logger.debug(f"收到来自 {addr} 的打洞响应")
|
|
||||||
|
|
||||||
def handle_connect_request(self, message, addr):
|
# 使用UDP打洞
|
||||||
"""
|
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
处理连接请求
|
# 绑定到相同的本地端口(用于后续TCP连接)
|
||||||
:param message: 连接请求消息
|
udp_socket.bind(('0.0.0.0', 0))
|
||||||
:param addr: 客户端地址
|
punch_port = udp_socket.getsockname()[1]
|
||||||
"""
|
# 向对方发送打洞包
|
||||||
conn_id = message['conn_id']
|
for i in range(10):
|
||||||
client_id = message['client_id']
|
udp_socket.sendto(b'punch', connector_addr)
|
||||||
service_name = message['service_name']
|
time.sleep(0.2)
|
||||||
logger.debug(f"收到来自 {addr} 的连接请求")
|
|
||||||
|
|
||||||
|
punch_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
punch_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
punch_sock.settimeout(10)
|
||||||
|
punch_sock.bind(('0.0.0.0', punch_port))
|
||||||
|
|
||||||
|
try:
|
||||||
|
punch_sock.connect(connector_addr)
|
||||||
|
print("Successfully connected to connector after delay")
|
||||||
threading.Thread(
|
threading.Thread(
|
||||||
target=self.handle_connection,
|
target=self.handle_connector_connection,
|
||||||
args=(conn_id, addr, client_id, service_name),
|
args=(punch_sock, service_name),
|
||||||
|
daemon=True
|
||||||
|
).start()
|
||||||
|
except socket.error as e:
|
||||||
|
print(f"Punching failed: {e}")
|
||||||
|
punch_sock.close()
|
||||||
|
|
||||||
|
def handle_connector_connection(self, sock, service_name):
|
||||||
|
# 处理与客户端的连接,启动心跳机制
|
||||||
|
threading.Thread(target=self.send_heartbeats, args=(sock,), daemon=True).start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
# 接收连接头信息
|
||||||
|
header = sock.recv(5)
|
||||||
|
if not header:
|
||||||
|
break
|
||||||
|
|
||||||
|
conn_id, data_len = struct.unpack("!I B", header)
|
||||||
|
data = sock.recv(data_len) if data_len > 0 else b''
|
||||||
|
|
||||||
|
if not data:
|
||||||
|
with self.lock:
|
||||||
|
if conn_id in self.connections:
|
||||||
|
self.connections[conn_id].close()
|
||||||
|
del self.connections[conn_id]
|
||||||
|
continue
|
||||||
|
|
||||||
|
with self.lock:
|
||||||
|
if conn_id not in self.connections:
|
||||||
|
service_port = self.service_ports.get(service_name, 22)
|
||||||
|
service_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
service_sock.connect(('127.0.0.1', service_port))
|
||||||
|
self.connections[conn_id] = service_sock
|
||||||
|
threading.Thread(
|
||||||
|
target=self.forward_data,
|
||||||
|
args=(service_sock, sock, conn_id),
|
||||||
daemon=True
|
daemon=True
|
||||||
).start()
|
).start()
|
||||||
|
|
||||||
def handle_punch_request(self, message, _):
|
self.connections[conn_id].sendall(data)
|
||||||
"""
|
except ConnectionResetError:
|
||||||
处理打洞请求
|
pass
|
||||||
:param message: 打洞请求消息
|
finally:
|
||||||
"""
|
sock.close()
|
||||||
for i in range(10):
|
with self.lock:
|
||||||
try:
|
for conn_id, service_sock in list(self.connections.items()):
|
||||||
self.udp_sock.sendto(json.dumps({
|
service_sock.close()
|
||||||
'action': 'punch',
|
self.connections.clear()
|
||||||
'client_id': message['client_id'],
|
|
||||||
'provider_id': self.provider_id,
|
|
||||||
}).encode(), tuple(message['client_addr']))
|
|
||||||
time.sleep(0.5)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"打洞失败: {str(e)}")
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
def handle_data(self, message, addr):
|
def forward_data(self, src, dst, conn_id):
|
||||||
"""
|
# 转发数据
|
||||||
处理数据消息
|
|
||||||
:param message: 数据消息
|
|
||||||
:param addr: 客户端地址
|
|
||||||
"""
|
|
||||||
conn_id = message['conn_id']
|
|
||||||
data = bytes.fromhex(message['data'])
|
|
||||||
if conn_id in self.active_connections:
|
|
||||||
# 转发数据到本地服务
|
|
||||||
logger.debug(f"收到来自 {addr} 的数据,转发到本地服务")
|
|
||||||
self.active_connections[conn_id]['local_sock'].sendall(data)
|
|
||||||
else:
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'stop_conn',
|
|
||||||
'conn_id': conn_id
|
|
||||||
}).encode(), addr)
|
|
||||||
logger.debug(f"收到来自 {addr} 的数据,但未找到对应的连接")
|
|
||||||
|
|
||||||
def handle_stop_conn(self, message, _):
|
|
||||||
"""
|
|
||||||
处理停止连接请求
|
|
||||||
:param message: 停止连接请求消息
|
|
||||||
"""
|
|
||||||
conn_id = message['conn_id']
|
|
||||||
if conn_id in self.active_connections:
|
|
||||||
self.active_connections[conn_id]['local_sock'].close()
|
|
||||||
self.active_connections.pop(conn_id, None)
|
|
||||||
|
|
||||||
def handle_stop_client(self, message, addr):
|
|
||||||
"""
|
|
||||||
处理停止客户端请求
|
|
||||||
:param message: 停止客户端请求消息
|
|
||||||
:param addr: 客户端地址
|
|
||||||
"""
|
|
||||||
client_id = message['client_id']
|
|
||||||
for conn_id, conn_info in self.active_connections.items():
|
|
||||||
if conn_info['client_id'] == client_id:
|
|
||||||
conn_info['local_sock'].close()
|
|
||||||
self.active_connections.pop(conn_id, None)
|
|
||||||
|
|
||||||
def handle_stop_provider(self, message, _):
|
|
||||||
"""
|
|
||||||
处理停止服务提供者请求
|
|
||||||
:param message: 停止服务提供者请求消息
|
|
||||||
"""
|
|
||||||
logger.info("收到停止服务提供者请求,正在关闭所有连接...")
|
|
||||||
for conn_id, conn_info in self.active_connections.items():
|
|
||||||
conn_info['local_sock'].close()
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'stop_conn',
|
|
||||||
'conn_id': conn_id
|
|
||||||
}).encode(), conn_info['client_addr'])
|
|
||||||
self.active_connections.clear()
|
|
||||||
self.running = False
|
|
||||||
self.udp_sock.close()
|
|
||||||
self.thread_pool.shutdown(wait=True)
|
|
||||||
logger.info("服务提供者已停止")
|
|
||||||
|
|
||||||
def handle_connection(self, conn_id, client_addr, client_id, service_name):
|
|
||||||
"""
|
|
||||||
处理来自客户端的连接
|
|
||||||
:param conn_id: 连接ID
|
|
||||||
:param client_addr: 客户端地址
|
|
||||||
:param client_id: 客户端ID
|
|
||||||
:param service_name: 服务名称
|
|
||||||
"""
|
|
||||||
internal_port = self.services[service_name]
|
|
||||||
try:
|
|
||||||
# 接受本地服务连接
|
|
||||||
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
local_sock.connect(('127.0.0.1', internal_port))
|
|
||||||
if not local_sock:
|
|
||||||
logger.error("无法连接到本地服务")
|
|
||||||
return
|
|
||||||
|
|
||||||
# 创建与客户端的UDP隧道
|
|
||||||
logger.debug(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', internal_port)}")
|
|
||||||
|
|
||||||
# 存储连接
|
|
||||||
self.active_connections[conn_id] = {
|
|
||||||
'local_sock': local_sock,
|
|
||||||
'client_addr': client_addr,
|
|
||||||
'client_id': client_id
|
|
||||||
}
|
|
||||||
|
|
||||||
# 通知客户端连接就绪
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'connected',
|
|
||||||
'client_id': conn_id
|
|
||||||
}).encode(), client_addr)
|
|
||||||
|
|
||||||
# 启动数据转发
|
|
||||||
self.forward_data(conn_id, local_sock, client_addr)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"连接失败: {str(e)}")
|
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'connect_failed',
|
|
||||||
'client_id': conn_id,
|
|
||||||
'message': str(e)
|
|
||||||
}).encode(), client_addr)
|
|
||||||
|
|
||||||
def forward_data(self, conn_id, local_sock, client_addr):
|
|
||||||
"""
|
|
||||||
转发TCP数据到UDP隧道
|
|
||||||
:param conn_id: 连接ID
|
|
||||||
:param local_sock: 本地服务套接字
|
|
||||||
:param client_addr: 客户端地址
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
# 从本地服务读取数据
|
data = src.recv(4096)
|
||||||
data = local_sock.recv(4096)
|
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
|
header = struct.pack("!I B", conn_id, len(data))
|
||||||
# 通过UDP发送给客户端
|
dst.sendall(header + data)
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'data',
|
|
||||||
'conn_id': conn_id,
|
|
||||||
'data': data.hex() # 十六进制编码二进制数据
|
|
||||||
}).encode(), client_addr)
|
|
||||||
logger.debug(f"转发数据给客户端{client_addr}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"转发数据失败: {str(e)}")
|
|
||||||
finally:
|
finally:
|
||||||
local_sock.close()
|
src.close()
|
||||||
if conn_id in self.active_connections:
|
with self.lock:
|
||||||
del self.active_connections[conn_id]
|
if conn_id in self.connections:
|
||||||
logger.debug(f"连接 {conn_id} 已关闭")
|
del self.connections[conn_id]
|
||||||
|
|
||||||
def udp_listener(self):
|
def send_heartbeats(self, sock):
|
||||||
"""
|
# 发送心跳包以保持连接
|
||||||
监听UDP消息并处理
|
while True:
|
||||||
"""
|
|
||||||
data = None
|
|
||||||
while self.running:
|
|
||||||
try:
|
try:
|
||||||
data, addr = self.udp_sock.recvfrom(4096)
|
sock.sendall(b'\x00\x00\x00\x00\x00') # Empty heartbeat
|
||||||
logger.debug(f"收到来自 {addr} 的消息: {data}")
|
time.sleep(5)
|
||||||
message = json.loads(data.decode())
|
except:
|
||||||
|
break
|
||||||
|
|
||||||
# 使用字典映射处理不同消息类型
|
def start(self):
|
||||||
action_handlers = {
|
# 启动提供者,连接到协调器并开始处理请求
|
||||||
'punch': self.handle_punch,
|
if not self.connect_to_coordinator():
|
||||||
'punch_check': self.handle_punch,
|
|
||||||
'punch_response': self.handle_punch_response,
|
|
||||||
'connect': self.handle_connect_request,
|
|
||||||
'punch_request': self.handle_punch_request,
|
|
||||||
'data': self.handle_data,
|
|
||||||
'stop_conn': self.handle_stop_conn,
|
|
||||||
'stop_client': self.handle_stop_client,
|
|
||||||
'stop_provider': self.handle_stop_provider,
|
|
||||||
}
|
|
||||||
|
|
||||||
# 提交任务到线程池
|
|
||||||
if message.get('action') in action_handlers:
|
|
||||||
self.thread_pool.submit(action_handlers[message['action']], message, addr)
|
|
||||||
elif message.get('status') == 'error':
|
|
||||||
logger.error(f"来自 {addr} 的错误消息: {message}")
|
|
||||||
elif message.get('status') == 'success':
|
|
||||||
logger.debug(f"来自 {addr} 的成功消息: {message}")
|
|
||||||
else:
|
|
||||||
logger.warning(f"收到未知消息: {message}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"处理UDP消息时发生错误: {str(e)}")
|
|
||||||
if data:
|
|
||||||
logger.error(f"无法处理消息: {data}")
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""
|
|
||||||
运行服务提供端
|
|
||||||
"""
|
|
||||||
# 注册服务
|
|
||||||
if not self.register_service():
|
|
||||||
logger.error("服务注册失败,退出程序")
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# 启动UDP监听线程
|
|
||||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
|
||||||
|
|
||||||
# 启动心跳线程
|
|
||||||
self.heartbeat_thread.start()
|
|
||||||
|
|
||||||
# 保持主线程运行
|
|
||||||
try:
|
try:
|
||||||
while self.running:
|
while True:
|
||||||
time.sleep(1)
|
data = self._recv_json()
|
||||||
except KeyboardInterrupt:
|
if data and data.get('action') == 'punch_request':
|
||||||
self.running = False
|
self.handle_punch_request(data)
|
||||||
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
|
except (ConnectionResetError, json.JSONDecodeError):
|
||||||
for conn_id, conn_info in self.active_connections.items():
|
print("Disconnected from coordinator")
|
||||||
self.udp_sock.sendto(json.dumps({
|
|
||||||
'action': 'stop_conn',
|
def _send_json(self, data):
|
||||||
'conn_id': conn_id
|
# 发送JSON数据
|
||||||
}).encode(), conn_info['client_addr'])
|
self.coord_conn.sendall(json.dumps(data).encode())
|
||||||
self.udp_sock.close()
|
|
||||||
logger.info("服务提供端已停止")
|
def _recv_json(self):
|
||||||
|
# 接收JSON数据
|
||||||
|
data = self.coord_conn.recv(4096)
|
||||||
|
return json.loads(data.decode()) if data else None
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
# 配置信息
|
provider = Provider(coordinator_host='www.awin-x.top')
|
||||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
provider.start()
|
||||||
SERVICES = {
|
|
||||||
'terraria-jk-2cxht5': 5001,
|
|
||||||
'minecraft-jk-ytsvb54u6': 5002,
|
|
||||||
'alist-jk-5shf43h6fdg': 5244,
|
|
||||||
'ssh-jk-54htrsd324n6': 22
|
|
||||||
}
|
|
||||||
|
|
||||||
provider = ServiceProvider(COORDINATOR_ADDR, SERVICES)
|
|
||||||
provider.run()
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user