import socket import threading import json import os import hashlib import secrets import time from collections import defaultdict class Coordinator: def __init__(self, host='0.0.0.0', port=5000): # 初始化协调器服务端参数 self.host = host # 监听地址 self.port = port # 监听端口 self.gathering_port = port + 2 self.punch_addr = defaultdict() # 生成盐值用于密码加密 self.salt = secrets.token_hex(8) # 存储管理员密码哈希值(盐+密码) self.stored_hash = hashlib.sha256((self.salt + "admin_password").encode()).hexdigest() # 存储用户令牌信息 self.tokens = {} # 存储服务注册信息,格式:{token: {services: [], addr: (), conn: socket}} self.services = defaultdict(dict) # 活动连接池(当前未使用) self.active_connections = {} # 线程锁保证数据安全 self.lock = threading.Lock() def handle_client(self, conn, addr): # 处理客户端连接 print(f"New connection from {addr}") token = None salt = secrets.token_hex(8) stored_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest() try: while True: # 接收客户端JSON数据 data = self.recv_json(conn) if not data: break action = data.get('action') # 登录流程:发送盐值 if action == 'login': if data.get('account') == 'admin': response = {'status': 'salt', 'salt': salt} self.send_json(conn, response) else: self.send_json(conn, {'status': 'error', 'message': 'Invalid account'}) # 认证流程:验证密码哈希 elif action == 'auth': if data.get('hash') == stored_hash: # 生成访问令牌(有效期1小时) token = secrets.token_hex(8) with self.lock: self.tokens[token] = { 'ip': addr[0], 'expiry': time.time() + 3600 # 令牌过期时间 } response = {'status': 'success', 'token': token, 'message': 'Login successful'} self.send_json(conn, response) else: self.send_json(conn, {'status': 'error', 'message': 'Authentication failed'}) # 服务注册流程 elif action == 'register_service': connector_token = data.get('token') if self.validate_token(connector_token, addr[0]): services = data.get('services', []) with self.lock: self.services[connector_token] = { 'services': services, # 支持的服务列表 'addr': addr, # 客户端地址信息 'conn': conn # 客户端连接套接字 } self.send_json(conn, {'status': 'success', 'message': 'Services registered'}) else: self.send_json(conn, {'status': 'error', 'message': 'Invalid token'}) # 服务请求流程 elif action == 'request_service': connector_token = data.get('token') if not self.validate_token(connector_token, addr[0]): self.send_json(conn, {'status': 'error', 'message': 'Invalid token'}) continue service_name = data.get('service_name') provider_token = self.find_service_provider(service_name) if provider_token: provider_info = self.services[provider_token] provider_addr = provider_info['addr'] connector_addr = addr count = 0 while connector_token not in self.punch_addr and count < 11: time.sleep(0.3) count += 1 if count >= 10: print("Timeout waiting for connector to respond") continue # 通知服务提供方进行NAT打洞 punch_msg = { 'action': 'punch_request', 'connector_addr': self.punch_addr[connector_token], 'service_name': service_name # 请求的服务名称 } self.punch_addr.pop(connector_token) self.send_json(provider_info['conn'], punch_msg) count = 0 while provider_token not in self.punch_addr and count < 11: time.sleep(0.3) count += 1 if count >= 10: print("Timeout waiting for provider to respond") continue # 响应请求方 self.send_json(conn, { 'status': 'success', 'provider_addr': self.punch_addr[provider_token] }) self.punch_addr.pop(provider_token) # 使用后立即销毁令牌 with self.lock: if connector_token in self.tokens: del self.tokens[connector_token] else: self.send_json(conn, {'status': 'error', 'message': 'Service not available'}) except (ConnectionResetError, json.JSONDecodeError): pass finally: conn.close() print(f"Connection closed: {addr}") # 清理资源 if token: with self.lock: if token in self.tokens: del self.tokens[token] if token in self.services: del self.services[token] def validate_token(self, token, ip): # 验证令牌有效性:存在、IP匹配、未过期 with self.lock: token_info = self.tokens.get(token) if token_info and token_info['ip'] == ip and token_info['expiry'] > time.time(): return True return False def find_service_provider(self, service_name): # 查找可用服务提供者 for token, info in self.services.items(): if service_name in info['services']: return token return None def recv_json(self, conn): # 接收并解析JSON数据 data = conn.recv(4096) if not data: return None return json.loads(data.decode()) def send_json(self, conn, data): # 发送JSON数据 conn.sendall(json.dumps(data).encode()) def punch_port_gathering(self): gathering_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 绑定到相同的本地端口(用于后续TCP连接) gathering_socket.bind(('0.0.0.0', self.gathering_port)) print(f"Starting punch port gathering on {gathering_socket.getsockname()}") while True: data, addr = gathering_socket.recvfrom(4096) print(f"Received punch port from {addr}") try: token = json.loads(data.decode()).get('token') if self.validate_token(token, addr[0]): self.punch_addr[token] = addr except (ConnectionResetError, json.JSONDecodeError): pass def start(self): # 启动协调器服务 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((self.host, self.port)) server.listen(5) print(f"Coordinator listening on {self.host}:{self.port}") gathering_thread = threading.Thread( target=self.punch_port_gathering, daemon=True ) gathering_thread.start() while True: conn, addr = server.accept() # 为每个连接创建独立线程 client_thread = threading.Thread( target=self.handle_client, args=(conn, addr), daemon=True ) client_thread.start() if __name__ == "__main__": coordinator = Coordinator() coordinator.start()