diff --git a/connector.py b/connector.py index bdb1934..93f03fc 100644 --- a/connector.py +++ b/connector.py @@ -1,317 +1,178 @@ import socket -import json import threading -from concurrent.futures import ThreadPoolExecutor +import json +import hashlib +import struct import time -import uuid -import logging - -# 配置日志 -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) -class ServiceConnector: - def __init__(self, coordinator_addr, service_name, local_port): - """ - 初始化服务连接器 - :param coordinator_addr: 协调服务器地址 (IP, port) - :param service_name: 请求的服务名称 - :param local_port: 本地监听端口 - """ - self.coordinator_addr = coordinator_addr - self.service_name = service_name +class Connector: + def __init__(self, coordinator_host='127.0.0.1', coordinator_port=5000, local_port=2222): + self.coordinator_host = coordinator_host + self.coordinator_port = coordinator_port self.local_port = local_port - self.client_id = f"connector-{uuid.uuid4().hex[:8]}" + self.token = None + self.connections = {} + self.conn_counter = 0 + self.lock = threading.Lock() - # 创建UDP套接字用于协调通信 - self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) + def connect_to_coordinator(self): + self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.coord_conn.connect((self.coordinator_host, self.coordinator_port)) - # 创建TCP套接字用于本地监听 - self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.tcp_sock.bind(('127.0.0.1', local_port)) - self.tcp_sock.listen(5) - logger.info(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'") + # Login + self._send_json({'action': 'login', 'account': 'admin'}) + response = self._recv_json() - # 注册线程池 - self.thread_pool = ThreadPoolExecutor(max_workers=10) + if response.get('status') == 'salt': + salt = response['salt'] + password_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest() + self._send_json({'action': 'auth', 'hash': password_hash}) + response = self._recv_json() - # 存储活动连接 - self.active_connections = {} - self.provider_addr = None - self.provider_id = None - self.internal_port = None - self.running = True - - def request_service(self): - """ - 向协调服务器请求服务 - :return: 请求是否成功 - """ - message = { - 'action': 'request', - 'service_name': self.service_name, - 'client_id': self.client_id - } - logger.info(f"向协调服务器 {self.coordinator_addr} 请求服务 '{self.service_name}'") - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) - - # 等待响应 - try: - data, _ = self.udp_sock.recvfrom(4096) - response = json.loads(data.decode()) - if response['status'] == 'success': - self.provider_addr = tuple(response['provider_addr']) - self.internal_port = response['internal_port'] - self.provider_id = response['provider_id'] - logger.info(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}") + if response.get('status') == 'success': + self.token = response['token'] + print(f"Authenticated. Token: {self.token}") return True - else: - logger.error(f"服务请求失败: {response['message']}") - return False - except Exception as e: - logger.error(f"请求服务时发生错误: {str(e)}") - return False + print("Connection to coordinator failed") + return False - def punch_hole(self): - """ - 执行UDP打洞 - :return: 打洞是否成功 - """ - if not self.provider_addr: - return False + def request_service(self, service_name): + self._send_json({ + 'action': 'request_service', + 'service_name': service_name, + 'token': self.token + }) + response = self._recv_json() - # 请求协服务器发起打洞 - message = { - 'action': 'punch_request', - 'client_id': self.client_id, - 'provider_id': self.provider_id - } - logger.info(f"向协调服务器 {self.coordinator_addr} 请求打洞到 {self.provider_addr}") - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) + if response.get('status') == 'success': + provider_addr = tuple(response['provider_addr']) + print(f"Connecting to provider at {provider_addr}") - # 等待协调服务器响应 - try: - data, _ = self.udp_sock.recvfrom(4096) - response = json.loads(data.decode()) - if response['status'] != 'success': - logger.error(f"打洞请求失败: {response['message']}") - return False - - # 向服务提供者发送打洞包 - logger.info(f"尝试打洞到 {self.provider_addr}...") - for _ in range(10): - self.udp_sock.sendto(json.dumps({ - 'action': 'punch', - 'client_id': self.client_id - }).encode(), self.provider_addr) + # 使用UDP打洞 + udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # 绑定到相同的本地端口(用于后续TCP连接) + udp_socket.bind(('0.0.0.0', 0)) + punch_port = udp_socket.getsockname()[1] + # 向对方发送打洞包 + for i in range(10): + udp_socket.sendto(b'punch', provider_addr) time.sleep(0.2) - # 检查连通性 - self.udp_sock.settimeout(10.0) + # Start listening for incoming connections from provider + listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind(('0.0.0.0', punch_port)) + listener.listen(5) + print(f"Listening on port {punch_port} for provider connections") + + # Start handler thread to accept provider's connection + threading.Thread( + target=self.handle_provider_connection, + args=(listener, service_name), + daemon=True + ).start() + return True + print("Failed to request service") + return False + + def handle_provider_connection(self, listener, service_name): + # Accept connection from provider + try: + provider_sock, addr = listener.accept() + print(f"Accepted provider connection from {addr}") + # Start heartbeat monitoring + threading.Thread( + target=self.monitor_heartbeats, + args=(provider_sock,), + daemon=True + ).start() + + # Start client listener to accept local clients + client_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + client_listener.bind(('0.0.0.0', self.local_port)) + client_listener.listen(5) + print(f"Client listener started on port {self.local_port}") + try: - self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr) - data, addr = self.udp_sock.recvfrom(4096) - if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr: - logger.info("打洞成功! 已建立UDP连接") - return True - else: - logger.error(f"错误的打洞响应{data}") - return False - except socket.timeout: - logger.error("打洞失败: 未收到响应") - return False + while True: + client_sock, addr = client_listener.accept() + print(f"New client from {addr}") + with self.lock: + conn_id = self.conn_counter + self.conn_counter += 1 + + threading.Thread( + target=self.handle_client_connection, + args=(client_sock, provider_sock, conn_id), + daemon=True + ).start() + finally: + client_listener.close() finally: - self.udp_sock.settimeout(None) + listener.close() - def handle_punch_response(self, message, addr): - """ - 处理打洞响应 - :param message: 打洞响应消息 - :param addr: 服务提供者地址 - """ - logger.debug(f"收到来自 {addr} 的打洞响应") + def handle_client_connection(self, client_sock, provider_sock, conn_id): + self.connections[conn_id] = client_sock - def handle_stop_conn(self, message, addr): - """ - 处理停止连接请求 - :param message: 停止连接请求消息 - :param addr: 服务提供者地址 - """ - conn_id = message['conn_id'] - if conn_id in self.active_connections: - self.active_connections[conn_id].close() - self.active_connections.pop(conn_id, None) - logger.debug(f"关闭本地连接 {conn_id}") - - def tcp_listener(self): - """ - 监听本地TCP连接 - """ - while self.running: - try: - client_sock, client_addr = self.tcp_sock.accept() - logger.debug(f"新的本地连接来自 {client_addr}") - - # 为每个连接生成唯一ID - conn_id = str(uuid.uuid4()) - - # 存储连接 - self.active_connections[conn_id] = client_sock - - # 请求服务提供者建立连接 - self.udp_sock.sendto(json.dumps({ - 'action': 'connect', - 'client_id': self.client_id, - 'conn_id': conn_id, - 'service_name': self.service_name - }).encode(), self.provider_addr) - - time.sleep(0.5) - - # 启动数据转发线程 - threading.Thread( - target=self.forward_data, - args=(conn_id, client_sock), - daemon=True - ).start() - - except Exception as e: - logger.error(f"处理本地连接时发生错误: {str(e)}") - - def handel_punch(self, message, addr): - """ - 处理UDP打洞请求 - :param message: 打洞请求消息 - :param addr: 服务提供者地址 - """ - logger.debug(f"收到来自 {addr} 的UDP打洞请求") - self.udp_sock.sendto(json.dumps({ - 'action': 'punch_response', - 'client_id': self.client_id, - 'provider_id': self.provider_id - }).encode(), addr) - - - def handle_data(self, message, addr): - """ - 处理数据消息 - :param message: 数据消息 - :param addr: 服务提供者地址 - """ - conn_id = message['conn_id'] - data = bytes.fromhex(message['data']) - if conn_id in self.active_connections: - # 转发数据到本地客户端 - logger.debug(f"收到来自 {addr} 的数据,转发到本地连接 {conn_id}") - self.active_connections[conn_id].sendall(data) - else: - self.udp_sock.sendto(json.dumps({'action': 'stop_conn', 'conn_id': conn_id}).encode(), addr) - logger.debug(f"收到来自 {addr} 的数据,但未找到对应的本地连接") - - def forward_data(self, conn_id, client_sock): - """ - 转发本地TCP数据到UDP隧道 - :param conn_id: 连接ID - :param client_sock: 本地客户端套接字 - """ try: while True: - # 从本地客户端读取数据 data = client_sock.recv(4096) if not data: break - - # 通过UDP发送给服务提供者 - logger.debug(f"发送数据到服务提供者: {self.provider_addr}") - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': data.hex() # 十六进制编码二进制数据 - }).encode(), self.provider_addr) - except Exception as e: - logger.error(f"转发数据失败: {str(e)}") + header = struct.pack("!I B", conn_id, len(data)) + provider_sock.sendall(header + data) finally: client_sock.close() - if conn_id in self.active_connections: - del self.active_connections[conn_id] - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': conn_id - }).encode(), self.provider_addr) - logger.debug(f"连接 {conn_id} 已关闭") + with self.lock: + if conn_id in self.connections: + del self.connections[conn_id] - def udp_listener(self): - """ - 监听UDP消息并处理 - """ - while self.running: + def monitor_heartbeats(self, sock): + last_heartbeat = time.time() + while True: try: - data, addr = self.udp_sock.recvfrom(65535) - logger.debug(f"收到来自 {addr} 的消息: {data}") - message = json.loads(data.decode()) + header = sock.recv(5) + if not header: + break - # 使用字典映射处理不同消息类型 - action_handlers = { - 'punch_response': self.handle_punch_response, - 'data': self.handle_data, - 'stop_conn': self.handle_stop_conn, - 'punch': self.handel_punch - } + conn_id, data_len = struct.unpack("!I B", header) + data = sock.recv(data_len) if data_len > 0 else b'' - # 提交任务到线程池 - if message.get('action') in action_handlers: - self.thread_pool.submit(action_handlers[message['action']], message, addr) - else: - logger.warning(f"收到未知消息: {message}") - except Exception as e: - logger.error(f"处理UDP消息时发生错误: {str(e)}") + # Check if heartbeat + if conn_id == 0 and data_len == 0: + last_heartbeat = time.time() + continue - def run(self): - """ - 运行服务连接端 - """ - # 请求服务 - if not self.request_service(): - logger.error("服务请求失败,退出程序") + # Forward data to client + with self.lock: + if conn_id in self.connections: + self.connections[conn_id].sendall(data) + except ConnectionResetError: + break + + # Check heartbeat timeout + if time.time() - last_heartbeat > 10: + print("Heartbeat timeout") + break + + def start(self, service_name='ssh'): + if not self.connect_to_coordinator(): return - # 执行打洞 - if not self.punch_hole(): - logger.error("打洞失败,退出程序") - return - - # 启动UDP监听线程 - threading.Thread(target=self.udp_listener, daemon=True).start() - - # 启动TCP监听线程 - threading.Thread(target=self.tcp_listener, daemon=True).start() - - # 保持主线程运行 - try: - while self.running: + if self.request_service(service_name): + while True: time.sleep(1) - except KeyboardInterrupt: - self.running = False - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_client', - 'client_id': self.client_id - }).encode(), self.provider_addr) - self.udp_sock.close() - self.tcp_sock.close() - logger.info("服务连接端已停止") + + def _send_json(self, data): + self.coord_conn.sendall(json.dumps(data).encode()) + + def _recv_json(self): + data = self.coord_conn.recv(4096) + return json.loads(data.decode()) if data else None -if __name__ == '__main__': - # 配置信息 - COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP - SERVICE_NAME = "ssh-jk-54htrsd324n6" - # SERVICE_NAME = "terraria-jk-2cxht5" - # SERVICE_NAME = "minecraft-jk-ytsvb54u6" - # SERVICE_NAME = "alist-jk-5shf43h6fdg" - LOCAL_PORT = 12345 # 本地映射端口 - - connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT) - connector.run() +if __name__ == "__main__": + connector = Connector(coordinator_host='www.awin-x.top',local_port=2222) + connector.start(service_name='ssh') \ No newline at end of file diff --git a/coordinator.py b/coordinator.py index b465429..391ecff 100644 --- a/coordinator.py +++ b/coordinator.py @@ -1,259 +1,178 @@ import socket -import json -import time -import logging -from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor import threading - -# 设置日志配置 -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) +import json +import os +import hashlib +import secrets +import time +from collections import defaultdict -class CoordinatorServer: +class Coordinator: def __init__(self, host='0.0.0.0', port=5000): - """ - 初始化协调服务器。 - :param host: 服务器绑定的主机地址,默认为 '0.0.0.0'。 - :param port: 服务器监听的端口号,默认为 5000。 - """ - self.host = host - self.port = port - self.clients = defaultdict(dict) # 客户端信息字典 - self.providers = defaultdict(dict) # 服务提供端信息字典 - self.services = defaultdict(tuple) # 服务名称与提供者信息的映射 - self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.udp_sock.bind((host, port)) - self.udp_sock.settimeout(1) # 设置 UDP 套接字接收超时时间为 1 秒 - self.running = True - self.executor = ThreadPoolExecutor(max_workers=10) # 创建线程池 - logger.debug(f"协调服务器运行在 {host}:{port}") + # 初始化协调器服务端参数 + self.host = host # 监听地址 + self.port = port # 监听端口 + # 生成盐值用于密码加密 + self.salt = secrets.token_hex(8) + # 存储管理员密码哈希值(盐+密码) + self.stored_hash = hashlib.sha256((self.salt + "admin_password").encode()).hexdigest() + # 存储用户令牌信息 + self.tokens = {} + # 存储服务注册信息,格式:{token: {services: [], addr: (), conn: socket}} + self.services = defaultdict(dict) + # 活动连接池(当前未使用) + self.active_connections = {} + # 线程锁保证数据安全 + self.lock = threading.Lock() - # 启动定时清理任务 - self.executor.submit(self.cleanup_expired_services) + def handle_client(self, conn, addr): + # 处理客户端连接 + print(f"New connection from {addr}") + token = None + salt = secrets.token_hex(8) + stored_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest() - def handle_register(self, data, addr): - """ - 处理服务注册请求。 - :param data: 包含服务注册信息的字典。 - :param addr: 发送请求的客户端地址。 - """ try: - provider_id = data['provider_id'] - services = data['services'] + while True: + # 接收客户端JSON数据 + data = self.recv_json(conn) + if not data: + break - # 记录客户端信息 - self.providers[provider_id] = { - 'addr': addr, - 'services': services, - 'last_seen': time.time() - } + action = data.get('action') - # 遍历 services 字典,记录每个服务的名称和端口 - for service_name, service_port in services.items(): - self.services[service_name] = (addr, service_port, provider_id) + # 登录流程:发送盐值 + if action == 'login': + if data.get('account') == 'admin': + response = {'status': 'salt', 'salt': salt} + self.send_json(conn, response) + else: + self.send_json(conn, {'status': 'error', 'message': 'Invalid account'}) - logger.info(f"注册来自{addr}:{services}") + # 认证流程:验证密码哈希 + elif action == 'auth': + if data.get('hash') == stored_hash: + # 生成访问令牌(有效期1小时) + token = secrets.token_hex(8) + with self.lock: + self.tokens[token] = { + 'ip': addr[0], + 'expiry': time.time() + 3600 # 令牌过期时间 + } + response = {'status': 'success', 'token': token, 'message': 'Login successful'} + self.send_json(conn, response) + else: + self.send_json(conn, {'status': 'error', 'message': 'Authentication failed'}) - # 直接回复注册成功消息 - response = {'status': 'success', 'message': '服务注册成功'} - self.udp_sock.sendto(json.dumps(response).encode(), addr) - except Exception as e: - response = {'status': 'error', 'message': str(e)} - self.udp_sock.sendto(json.dumps(response).encode(), addr) + # 服务注册流程 + elif action == 'register_service': + client_token = data.get('token') + if self.validate_token(client_token, addr[0]): + services = data.get('services', []) + with self.lock: + self.services[client_token] = { + 'services': services, # 支持的服务列表 + 'addr': addr, # 客户端地址信息 + 'conn': conn # 客户端连接套接字 + } + self.send_json(conn, {'status': 'success', 'message': 'Services registered'}) + else: + self.send_json(conn, {'status': 'error', 'message': 'Invalid token'}) - def handle_request(self, data, addr): - """ - 处理服务请求。 - :param data: 包含服务请求信息的字典。 - :param addr: 发送请求的客户端地址。 - """ - try: - service_name = data['service_name'] - client_id = data['client_id'] + # 服务请求流程 + elif action == 'request_service': + client_token = data.get('token') + if not self.validate_token(client_token, addr[0]): + self.send_json(conn, {'status': 'error', 'message': 'Invalid token'}) + continue - if service_name not in self.services: - response = {'status': 'error', 'message': '服务未找到'} - self.udp_sock.sendto(json.dumps(response).encode(), addr) + service_name = data.get('service_name') + provider_token = self.find_service_provider(service_name) - # 记录请求客户端信息 - self.clients[client_id] = { - 'addr': addr, - 'service_name': service_name, - 'last_seen': time.time() - } + if provider_token: + provider_info = self.services[provider_token] + provider_addr = provider_info['addr'] + connector_addr = addr - # 获取服务提供者的信息 - provider_addr, internal_port, provider_id = self.services[service_name] + # 通知服务提供方进行NAT打洞 + punch_msg = { + 'action': 'punch_request', + 'connector_addr': connector_addr, # 请求方地址 + 'service_name': service_name # 请求的服务名称 + } + self.send_json(provider_info['conn'], punch_msg) - logger.debug(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}") + # 响应请求方 + self.send_json(conn, { + 'status': 'success', + 'provider_addr': provider_addr # 提供方地址信息 + }) - response = { - 'status': 'success', - 'provider_addr': provider_addr, - 'internal_port': internal_port, - 'provider_id': provider_id - } - self.udp_sock.sendto(json.dumps(response).encode(), addr) - except Exception as e: - response = {'status': 'error', 'message': str(e)} - self.udp_sock.sendto(json.dumps(response).encode(), addr) + # 使用后立即销毁令牌 + with self.lock: + if client_token in self.tokens: + del self.tokens[client_token] + else: + self.send_json(conn, {'status': 'error', 'message': 'Service not available'}) + except (ConnectionResetError, json.JSONDecodeError): + pass + finally: + conn.close() + print(f"Connection closed: {addr}") + # 清理资源 + if token: + with self.lock: + if token in self.tokens: + del self.tokens[token] + if token in self.services: + del self.services[token] - def handle_punch_request(self, data, addr): - """ - 处理打洞请求。 - :param data: 包含打洞请求信息的字典。 - :param addr: 发送请求的客户端地址。 - """ - try: - client_id = data['client_id'] - provider_id = data['provider_id'] + def validate_token(self, token, ip): + # 验证令牌有效性:存在、IP匹配、未过期 + with self.lock: + token_info = self.tokens.get(token) + if token_info and token_info['ip'] == ip and token_info['expiry'] > time.time(): + return True + return False - # 获取目标客户端信息 - if provider_id not in self.providers: - response = {'status': 'error', 'message': '目标提供端未找到'} - self.udp_sock.sendto(json.dumps(response).encode(), addr) + def find_service_provider(self, service_name): + # 查找可用服务提供者 + for token, info in self.services.items(): + if service_name in info['services']: + return token + return None - provider_addr = self.providers[provider_id]['addr'] + def recv_json(self, conn): + # 接收并解析JSON数据 + data = conn.recv(4096) + if not data: + return None + return json.loads(data.decode()) - logger.info(f"打洞请求: {addr} -> {provider_id}") - - # 通知双方对方的地址 - self.udp_sock.sendto(json.dumps({ - 'action': 'punch_request', - 'client_id': client_id, - 'client_addr': addr - }).encode(), provider_addr) - - self.udp_sock.sendto(json.dumps({ - 'status': 'success', - 'provider_addr': provider_addr - }).encode(), addr) - except Exception as e: - logger.error(f"处理打洞请求时出错: {e}") - response = {'status': 'error', 'message': str(e)} - self.udp_sock.sendto(json.dumps(response).encode(), addr) - - def handle_stop_provider(self, data, addr): - """ - 处理停止服务请求。 - :param data: 包含停止服务信息的字典。 - :param addr: 发送请求的客户端地址。 - """ - try: - service_name = data['service_name'] - self.services.pop(service_name, None) - response = {'status': 'success', 'message': '服务停止成功'} - self.udp_sock.sendto(json.dumps(response).encode(), addr) - except Exception as e: - response = {'status': 'error', 'message': str(e)} - self.udp_sock.sendto(json.dumps(response).encode(), addr) - - def handle_heartbeat(self, data, addr): - """ - 处理心跳包。 - :param data: 包含心跳信息的字典。 - :param addr: 发送心跳的客户端地址。 - """ - try: - provider_id = data['provider_id'] - self.providers[provider_id]['last_seen'] = time.time() - response = {'status': 'success', 'message': '心跳更新成功'} - self.udp_sock.sendto(json.dumps(response).encode(), addr) - except Exception as e: - response = {'status': 'error', 'message': str(e)} - self.udp_sock.sendto(json.dumps(response).encode(), addr) - - def cleanup_expired_services(self): - """ - 定时清理过期服务。 - 每20秒检查一次,移除超过30秒未更新心跳的服务。 - """ - while self.running: - time.sleep(60) # 每60秒检查一次 - current_time = time.time() - expired_providers = [] - for provider_id, provider in self.providers.items(): - if current_time - provider['last_seen'] > 60: # 心跳包最后更新时间大于30秒 - expired_providers.append(provider_id) - logger.info(f"服务过期: {provider['addr']}") - for provider_id in expired_providers: - provider = self.providers[provider_id] - for service_name in provider['services'].keys(): # 使用新的 services 字段 - self.services.pop(service_name, None) - self.providers.pop(provider_id, None) - - def run(self): - """ - 运行协调服务器。 - """ - logger.info(f"协调服务器已启动,端口{self.udp_sock.getsockname()[1]},等待连接...") - action_handlers = { - 'register': self.handle_register, # 服务注册处理行为 - 'request': self.handle_request, # 服务请求处理行为 - 'punch_request': self.handle_punch_request, # 打洞处理行为 - 'stop_provider': self.handle_stop_provider, # 停止服务行为 - 'heartbeat': self.handle_heartbeat # 心跳处理行为 - } - - while self.running: - try: - data, addr = self.udp_sock.recvfrom(4096) - try: - message = json.loads(data.decode()) - action = message.get('action') - - handler = action_handlers.get(action) - if handler: # 如果存在对应的处理行为,则执行它 - self.executor.submit(handler, message, addr).result() - else: # 如果没有对应的处理行为,则返回错误响应 - self.udp_sock.sendto(json.dumps({ - 'status': 'error', - 'message': '无效操作'}).encode(), addr) - except json.JSONDecodeError: - self.udp_sock.sendto(json.dumps({ - 'status': 'error', - 'message': '无效的JSON数据' - }).encode(), addr) - except socket.timeout: # 捕获 UDP 接收超时异常 - pass # 不做任何处理,允许主线程继续执行 - except Exception as e: - logger.debug(f"服务器错误: {str(e)}") + def send_json(self, conn, data): + # 发送JSON数据 + conn.sendall(json.dumps(data).encode()) def start(self): - """ - 启动协调服务器。 - """ - # 创建线程运行 run 方法 - server_thread = threading.Thread(target=self.run) - server_thread.start() + # 启动协调器服务 + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind((self.host, self.port)) + server.listen(5) + print(f"Coordinator listening on {self.host}:{self.port}") - try: - # 主线程捕获键盘打断信号 - while self.running: - time.sleep(1) # 防止主线程空转 - except KeyboardInterrupt: - logger.info("检测到键盘打断,准备退出...") - self.running = False - - # 通知所有提供端停止服务 - for provider_id, provider_info in self.providers.items(): - try: - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_provider', - }).encode(), provider_info['addr']) - logger.info(f"已通知提供端 {provider_id} 停止服务") - except Exception as e: - logger.error(f"通知提供端 {provider_id} 停止服务时出错: {e}") - server_thread.join() # 等待服务器线程退出 - - # 关闭线程池和套接字 - self.executor.shutdown() - self.udp_sock.close() - logger.info("协调服务器已安全退出") + while True: + conn, addr = server.accept() + # 为每个连接创建独立线程 + client_thread = threading.Thread( + target=self.handle_client, + args=(conn, addr), + daemon=True + ) + client_thread.start() -if __name__ == '__main__': - server = CoordinatorServer() - server.start() +if __name__ == "__main__": + coordinator = Coordinator() + coordinator.start() \ No newline at end of file diff --git a/provider.py b/provider.py index e139334..874959f 100644 --- a/provider.py +++ b/provider.py @@ -1,348 +1,185 @@ import socket -import json import threading +import json +import hashlib import time -import uuid -from concurrent.futures import ThreadPoolExecutor -import logging - -# 配置日志 -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) +import struct -class ServiceProvider: - def __init__(self, coordinator_addr, services): - """ - 初始化服务提供者 - :param coordinator_addr: 协调服务器地址 (IP, port) - :param services: 提供的服务列表 {服务名: 端口号} - """ - self.provider_id = f"provider-{uuid.uuid4().hex[:8]}" - self.coordinator_addr = coordinator_addr - self.services = services +# 定义 Provider 类,用于处理与协调器的连接和P2P通信 +class Provider: + def __init__(self, coordinator_host='127.0.0.1', coordinator_port=5000): + # 初始化协调器的主机和端口 + self.coordinator_host = coordinator_host + self.coordinator_port = coordinator_port + # 用于存储认证令牌 + self.token = None + # 定义可提供的服务及其默认端口 + self.service_ports = {'ssh': 22, 'alist': 5244, 'minecraft': 25565} + # 存储连接的客户端 + self.connections = {} + # 用于线程安全操作的锁 + self.lock = threading.Lock() - # 创建UDP套接字用于协调通信 - self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) + def connect_to_coordinator(self): + # 创建与协调器的TCP连接 + self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.coord_conn.connect((self.coordinator_host, self.coordinator_port)) - # 创建线程池用于处理UDP消息 - self.thread_pool = ThreadPoolExecutor(max_workers=10) + # 发送登录请求 + self._send_json({'action': 'login', 'account': 'admin'}) + response = self._recv_json() - self.clients = {} - # 存储活动连接 - self.active_connections = {} - self.running = True + # 处理协调器返回的盐值并进行密码哈希验证 + if response.get('status') == 'salt': + salt = response['salt'] + password_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest() + self._send_json({'action': 'auth', 'hash': password_hash}) + response = self._recv_json() - # 心跳线程 - self.heartbeat_thread = threading.Thread(target=self.send_heartbeat, daemon=True) + # 如果认证成功,存储令牌并注册服务 + if response.get('status') == 'success': + self.token = response['token'] + print(f"Authenticated. Token: {self.token}") - def send_heartbeat(self): - """ - 发送心跳包到协调服务器 - """ - while self.running: - try: - message = { - 'action': 'heartbeat', - 'provider_id': self.provider_id - } - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) - logger.debug(f"发送心跳包给协调服务器 {self.coordinator_addr}") - time.sleep(20) # 每20秒发送一次心跳包 - except Exception as e: - logger.error(f"发送心跳包失败: {str(e)}") + self._send_json({ + 'action': 'register_service', + 'services': list(self.service_ports.keys()), + 'token': self.token + }) + response = self._recv_json() + if response.get('status') == 'success': + print("Services registered") + return True + print("Connection to coordinator failed") + return False - def register_service(self): - """ - 向协调服务器注册服务 - :return: 注册是否成功 - """ - message = { - 'action': 'register', - 'services': self.services, - 'provider_id': self.provider_id - } - logger.info(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.services}'") - self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) + def handle_punch_request(self, data): + connector_addr = tuple(data['connector_addr']) + service_name = data['service_name'] + print(f"Punching hole to connector at {connector_addr}, waiting 10 seconds...") - # 等待响应 - try: - data, _ = self.udp_sock.recvfrom(4096) - response = json.loads(data.decode()) - if response['status'] == 'success': - logger.info(f"服务 '{self.services}' 注册成功") - return True - else: - logger.error(f"注册失败: {response['message']}") - return False - except Exception as e: - logger.error(f"注册服务时发生错误: {str(e)}") - return False + # Wait for 10 seconds to allow the connector to initiate its punch + time.sleep(2) - def handle_punch(self, message, addr): - """ - 处理打洞请求 - :param message: 打洞请求消息 - :param addr: 客户端地址 - """ - self.udp_sock.sendto(json.dumps( - {'action': 'punch_response', - 'client_id': message['client_id'], - 'provider_id': self.provider_id - }).encode(), addr) - logger.debug(f"收到来自 {addr} 的打洞请求,已响应") - - def handle_punch_response(self, _, addr): - """ - 处理打洞响应 - :param addr: 客户端地址 - """ - logger.debug(f"收到来自 {addr} 的打洞响应") - - def handle_connect_request(self, message, addr): - """ - 处理连接请求 - :param message: 连接请求消息 - :param addr: 客户端地址 - """ - conn_id = message['conn_id'] - client_id = message['client_id'] - service_name = message['service_name'] - logger.debug(f"收到来自 {addr} 的连接请求") - threading.Thread( - target=self.handle_connection, - args=(conn_id, addr, client_id, service_name), - daemon=True - ).start() - - def handle_punch_request(self, message, _): - """ - 处理打洞请求 - :param message: 打洞请求消息 - """ + # 使用UDP打洞 + udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # 绑定到相同的本地端口(用于后续TCP连接) + udp_socket.bind(('0.0.0.0', 0)) + punch_port = udp_socket.getsockname()[1] + # 向对方发送打洞包 for i in range(10): - try: - self.udp_sock.sendto(json.dumps({ - 'action': 'punch', - 'client_id': message['client_id'], - 'provider_id': self.provider_id, - }).encode(), tuple(message['client_addr'])) - time.sleep(0.5) - except Exception as e: - logger.error(f"打洞失败: {str(e)}") - time.sleep(1) + udp_socket.sendto(b'punch', connector_addr) + time.sleep(0.2) - def handle_data(self, message, addr): - """ - 处理数据消息 - :param message: 数据消息 - :param addr: 客户端地址 - """ - conn_id = message['conn_id'] - data = bytes.fromhex(message['data']) - if conn_id in self.active_connections: - # 转发数据到本地服务 - logger.debug(f"收到来自 {addr} 的数据,转发到本地服务") - self.active_connections[conn_id]['local_sock'].sendall(data) - else: - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': conn_id - }).encode(), addr) - logger.debug(f"收到来自 {addr} 的数据,但未找到对应的连接") - def handle_stop_conn(self, message, _): - """ - 处理停止连接请求 - :param message: 停止连接请求消息 - """ - conn_id = message['conn_id'] - if conn_id in self.active_connections: - self.active_connections[conn_id]['local_sock'].close() - self.active_connections.pop(conn_id, None) + punch_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + punch_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + punch_sock.settimeout(10) + punch_sock.bind(('0.0.0.0', punch_port)) - def handle_stop_client(self, message, addr): - """ - 处理停止客户端请求 - :param message: 停止客户端请求消息 - :param addr: 客户端地址 - """ - client_id = message['client_id'] - for conn_id, conn_info in self.active_connections.items(): - if conn_info['client_id'] == client_id: - conn_info['local_sock'].close() - self.active_connections.pop(conn_id, None) - - def handle_stop_provider(self, message, _): - """ - 处理停止服务提供者请求 - :param message: 停止服务提供者请求消息 - """ - logger.info("收到停止服务提供者请求,正在关闭所有连接...") - for conn_id, conn_info in self.active_connections.items(): - conn_info['local_sock'].close() - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': conn_id - }).encode(), conn_info['client_addr']) - self.active_connections.clear() - self.running = False - self.udp_sock.close() - self.thread_pool.shutdown(wait=True) - logger.info("服务提供者已停止") - - def handle_connection(self, conn_id, client_addr, client_id, service_name): - """ - 处理来自客户端的连接 - :param conn_id: 连接ID - :param client_addr: 客户端地址 - :param client_id: 客户端ID - :param service_name: 服务名称 - """ - internal_port = self.services[service_name] try: - # 接受本地服务连接 - local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - local_sock.connect(('127.0.0.1', internal_port)) - if not local_sock: - logger.error("无法连接到本地服务") - return + punch_sock.connect(connector_addr) + print("Successfully connected to connector after delay") + threading.Thread( + target=self.handle_connector_connection, + args=(punch_sock, service_name), + daemon=True + ).start() + except socket.error as e: + print(f"Punching failed: {e}") + punch_sock.close() - # 创建与客户端的UDP隧道 - logger.debug(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', internal_port)}") + def handle_connector_connection(self, sock, service_name): + # 处理与客户端的连接,启动心跳机制 + threading.Thread(target=self.send_heartbeats, args=(sock,), daemon=True).start() - # 存储连接 - self.active_connections[conn_id] = { - 'local_sock': local_sock, - 'client_addr': client_addr, - 'client_id': client_id - } - - # 通知客户端连接就绪 - self.udp_sock.sendto(json.dumps({ - 'action': 'connected', - 'client_id': conn_id - }).encode(), client_addr) - - # 启动数据转发 - self.forward_data(conn_id, local_sock, client_addr) - except Exception as e: - logger.error(f"连接失败: {str(e)}") - self.udp_sock.sendto(json.dumps({ - 'action': 'connect_failed', - 'client_id': conn_id, - 'message': str(e) - }).encode(), client_addr) - - def forward_data(self, conn_id, local_sock, client_addr): - """ - 转发TCP数据到UDP隧道 - :param conn_id: 连接ID - :param local_sock: 本地服务套接字 - :param client_addr: 客户端地址 - """ try: while True: - # 从本地服务读取数据 - data = local_sock.recv(4096) - if not data: + # 接收连接头信息 + header = sock.recv(5) + if not header: break - # 通过UDP发送给客户端 - self.udp_sock.sendto(json.dumps({ - 'action': 'data', - 'conn_id': conn_id, - 'data': data.hex() # 十六进制编码二进制数据 - }).encode(), client_addr) - logger.debug(f"转发数据给客户端{client_addr}") - except Exception as e: - logger.error(f"转发数据失败: {str(e)}") + conn_id, data_len = struct.unpack("!I B", header) + data = sock.recv(data_len) if data_len > 0 else b'' + + if not data: + with self.lock: + if conn_id in self.connections: + self.connections[conn_id].close() + del self.connections[conn_id] + continue + + with self.lock: + if conn_id not in self.connections: + service_port = self.service_ports.get(service_name, 22) + service_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + service_sock.connect(('127.0.0.1', service_port)) + self.connections[conn_id] = service_sock + threading.Thread( + target=self.forward_data, + args=(service_sock, sock, conn_id), + daemon=True + ).start() + + self.connections[conn_id].sendall(data) + except ConnectionResetError: + pass finally: - local_sock.close() - if conn_id in self.active_connections: - del self.active_connections[conn_id] - logger.debug(f"连接 {conn_id} 已关闭") + sock.close() + with self.lock: + for conn_id, service_sock in list(self.connections.items()): + service_sock.close() + self.connections.clear() - def udp_listener(self): - """ - 监听UDP消息并处理 - """ - data = None - while self.running: + def forward_data(self, src, dst, conn_id): + # 转发数据 + try: + while True: + data = src.recv(4096) + if not data: + break + header = struct.pack("!I B", conn_id, len(data)) + dst.sendall(header + data) + finally: + src.close() + with self.lock: + if conn_id in self.connections: + del self.connections[conn_id] + + def send_heartbeats(self, sock): + # 发送心跳包以保持连接 + while True: try: - data, addr = self.udp_sock.recvfrom(4096) - logger.debug(f"收到来自 {addr} 的消息: {data}") - message = json.loads(data.decode()) + sock.sendall(b'\x00\x00\x00\x00\x00') # Empty heartbeat + time.sleep(5) + except: + break - # 使用字典映射处理不同消息类型 - action_handlers = { - 'punch': self.handle_punch, - 'punch_check': self.handle_punch, - 'punch_response': self.handle_punch_response, - 'connect': self.handle_connect_request, - 'punch_request': self.handle_punch_request, - 'data': self.handle_data, - 'stop_conn': self.handle_stop_conn, - 'stop_client': self.handle_stop_client, - 'stop_provider': self.handle_stop_provider, - } - - # 提交任务到线程池 - if message.get('action') in action_handlers: - self.thread_pool.submit(action_handlers[message['action']], message, addr) - elif message.get('status') == 'error': - logger.error(f"来自 {addr} 的错误消息: {message}") - elif message.get('status') == 'success': - logger.debug(f"来自 {addr} 的成功消息: {message}") - else: - logger.warning(f"收到未知消息: {message}") - except Exception as e: - logger.error(f"处理UDP消息时发生错误: {str(e)}") - if data: - logger.error(f"无法处理消息: {data}") - - def run(self): - """ - 运行服务提供端 - """ - # 注册服务 - if not self.register_service(): - logger.error("服务注册失败,退出程序") + def start(self): + # 启动提供者,连接到协调器并开始处理请求 + if not self.connect_to_coordinator(): return - # 启动UDP监听线程 - threading.Thread(target=self.udp_listener, daemon=True).start() - - # 启动心跳线程 - self.heartbeat_thread.start() - - # 保持主线程运行 try: - while self.running: - time.sleep(1) - except KeyboardInterrupt: - self.running = False - self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr) - for conn_id, conn_info in self.active_connections.items(): - self.udp_sock.sendto(json.dumps({ - 'action': 'stop_conn', - 'conn_id': conn_id - }).encode(), conn_info['client_addr']) - self.udp_sock.close() - logger.info("服务提供端已停止") + while True: + data = self._recv_json() + if data and data.get('action') == 'punch_request': + self.handle_punch_request(data) + except (ConnectionResetError, json.JSONDecodeError): + print("Disconnected from coordinator") + + def _send_json(self, data): + # 发送JSON数据 + self.coord_conn.sendall(json.dumps(data).encode()) + + def _recv_json(self): + # 接收JSON数据 + data = self.coord_conn.recv(4096) + return json.loads(data.decode()) if data else None -if __name__ == '__main__': - # 配置信息 - COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP - SERVICES = { - 'terraria-jk-2cxht5': 5001, - 'minecraft-jk-ytsvb54u6': 5002, - 'alist-jk-5shf43h6fdg': 5244, - 'ssh-jk-54htrsd324n6': 22 - } - - provider = ServiceProvider(COORDINATOR_ADDR, SERVICES) - provider.run() +if __name__ == "__main__": + provider = Provider(coordinator_host='www.awin-x.top') + provider.start()