From bcab343fe6a3dec9c7defa724395caebaad714cd Mon Sep 17 00:00:00 2001 From: awinx Date: Sun, 1 Jun 2025 16:19:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E9=AB=98=E7=A8=B3=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- connector.py | 182 ------------------------------------------------- coordinator.py | 181 ------------------------------------------------ provider.py | 182 ------------------------------------------------- server.py | 155 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 545 deletions(-) delete mode 100644 connector.py delete mode 100644 coordinator.py delete mode 100644 provider.py create mode 100644 server.py diff --git a/connector.py b/connector.py deleted file mode 100644 index a6e59b6..0000000 --- a/connector.py +++ /dev/null @@ -1,182 +0,0 @@ -import socket -import threading -import json -import hashlib -import struct -import time - - -class Connector: - def __init__(self, coordinator_host='127.0.0.1', coordinator_port=5000): - self.coordinator_addr = (coordinator_host, coordinator_port) - self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.coord_conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.coord_conn.connect(self.coordinator_addr) - self.local_port = self.coord_conn.getsockname()[1] - self.token = None - self.connections = {} - self.conn_counter = 0 - self.lock = threading.Lock() - - def connect_to_coordinator(self): - # Login - self._send_json({'action': 'login', 'account': 'admin'}) - response = self._recv_json() - - if response.get('status') == 'salt': - salt = response['salt'] - password_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest() - self._send_json({'action': 'auth', 'hash': password_hash}) - response = self._recv_json() - - if response.get('status') == 'success': - self.token = response['token'] - print(f"Authenticated. Token: {self.token}") - return True - print("Connection to coordinator failed") - return False - - def request_service(self, service_name): - self._send_json({ - 'action': 'request_service', - 'service_name': service_name, - 'token': self.token - }) - response = self._recv_json() - - if response.get('status') == 'success': - provider_addr = tuple(response['provider_addr']) - print(f"Connecting to provider at {provider_addr}") - - punch_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - punch_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - punch_socket.bind(('0.0.0.0', self.local_port)) - # 向对方发送打洞包 - for i in range(10): - punch_socket.sendto(b'pong pong pong pong', provider_addr) - time.sleep(0.2) - punch_socket.close() - - try: - punch_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - punch_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - punch_socket.connect(provider_addr) - except: - print("tcp 打洞") - - listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - listener.bind(('0.0.0.0', self.local_port)) - listener.listen(5) - print(f"Listening on port {self.local_port} for provider connections") - - # Start handler thread to accept provider's connection - threading.Thread( - target=self.handle_provider_connection, - args=(listener, service_name), - daemon=True - ).start() - return True - print("Failed to request service") - return False - - def handle_provider_connection(self, listener, service_name): - # Accept connection from provider - try: - provider_sock, addr = listener.accept() - print(f"Accepted provider connection from {addr}") - # Start heartbeat monitoring - threading.Thread( - target=self.monitor_heartbeats, - args=(provider_sock,), - daemon=True - ).start() - - # Start client listener to accept local clients - client_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - client_listener.bind(('0.0.0.0', self.local_port)) - client_listener.listen(5) - print(f"Client listener started on port {self.local_port}") - - try: - while True: - client_sock, addr = client_listener.accept() - print(f"New client from {addr}") - with self.lock: - conn_id = self.conn_counter - self.conn_counter += 1 - - threading.Thread( - target=self.handle_client_connection, - args=(client_sock, provider_sock, conn_id), - daemon=True - ).start() - finally: - client_listener.close() - finally: - listener.close() - - def handle_client_connection(self, client_sock, provider_sock, conn_id): - self.connections[conn_id] = client_sock - - try: - while True: - data = client_sock.recv(4096) - if not data: - break - header = struct.pack("!I B", conn_id, len(data)) - provider_sock.sendall(header + data) - finally: - client_sock.close() - with self.lock: - if conn_id in self.connections: - del self.connections[conn_id] - - def monitor_heartbeats(self, sock): - last_heartbeat = time.time() - while True: - try: - header = sock.recv(5) - if not header: - break - - conn_id, data_len = struct.unpack("!I B", header) - data = sock.recv(data_len) if data_len > 0 else b'' - - # Check if heartbeat - if conn_id == 0 and data_len == 0: - last_heartbeat = time.time() - continue - - # Forward data to client - with self.lock: - if conn_id in self.connections: - self.connections[conn_id].sendall(data) - except ConnectionResetError: - break - - # Check heartbeat timeout - if time.time() - last_heartbeat > 10: - print("Heartbeat timeout") - break - - def start(self, service_name='ssh'): - if not self.connect_to_coordinator(): - return - - if self.request_service(service_name): - while True: - time.sleep(1) - - def _send_json(self, data): - self.coord_conn.sendall(json.dumps(data).encode()) - - def _recv_json(self): - data = self.coord_conn.recv(4096) - return json.loads(data.decode()) if data else None - - -if __name__ == "__main__": - connector = Connector(coordinator_host='www.awin-x.top') - connector.start(service_name='ssh') diff --git a/coordinator.py b/coordinator.py deleted file mode 100644 index 95dc36e..0000000 --- a/coordinator.py +++ /dev/null @@ -1,181 +0,0 @@ -import socket -import threading -import json -import os -import hashlib -import secrets -import time -from collections import defaultdict - - -def send_json(conn, data): - # 发送JSON数据 - conn.sendall(json.dumps(data).encode()) - - -def recv_json(conn): - # 接收并解析JSON数据 - data = conn.recv(4096) - if not data: - return None - return json.loads(data.decode()) - - -class Coordinator: - def __init__(self, host='0.0.0.0', port=5000): - # 初始化协调器服务端参数 - self.host = host # 监听地址 - self.port = port # 监听端口 - # 生成盐值用于密码加密 - self.salt = secrets.token_hex(8) - # 存储管理员密码哈希值(盐+密码) - self.stored_hash = hashlib.sha256((self.salt + "admin_password").encode()).hexdigest() - # 存储用户令牌信息 - self.tokens = {} - # 存储服务注册信息,格式:{token: {services: [], addr: (), conn: socket}} - self.services = defaultdict(dict) - # 活动连接池(当前未使用) - self.active_connections = {} - # 线程锁保证数据安全 - self.lock = threading.Lock() - - def handle_client(self, conn, addr): - # 处理客户端连接 - print(f"New connection from {addr}") - token = None - salt = secrets.token_hex(8) - stored_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest() - - try: - while True: - # 接收客户端JSON数据 - data = recv_json(conn) - if not data: - break - - action = data.get('action') - - # 登录流程:发送盐值 - if action == 'login': - if data.get('account') == 'admin': - response = {'status': 'salt', 'salt': salt} - send_json(conn, response) - else: - send_json(conn, {'status': 'error', 'message': 'Invalid account'}) - - # 认证流程:验证密码哈希 - elif action == 'auth': - if data.get('hash') == stored_hash: - # 生成访问令牌(有效期1小时) - token = secrets.token_hex(8) - with self.lock: - self.tokens[token] = { - 'ip': addr[0], - 'expiry': time.time() + 3600 # 令牌过期时间 - } - response = {'status': 'success', 'token': token, 'message': 'Login successful'} - send_json(conn, response) - else: - send_json(conn, {'status': 'error', 'message': 'Authentication failed'}) - - # 服务注册流程 - elif action == 'register_service': - connector_token = data.get('token') - if self.validate_token(connector_token, addr[0]): - services = data.get('services', []) - with self.lock: - self.services[connector_token] = { - 'services': services, # 支持的服务列表 - 'addr': addr, # 客户端地址信息 - 'conn': conn # 客户端连接套接字 - } - send_json(conn, {'status': 'success', 'message': 'Services registered'}) - else: - send_json(conn, {'status': 'error', 'message': 'Invalid token'}) - - # 服务请求流程 - elif action == 'request_service': - connector_token = data.get('token') - if not self.validate_token(connector_token, addr[0]): - send_json(conn, {'status': 'error', 'message': 'Invalid token'}) - continue - - service_name = data.get('service_name') - provider_token = self.find_service_provider(service_name) - - if provider_token: - provider_info = self.services[provider_token] - provider_addr = provider_info['addr'] - connector_addr = addr - count = 0 - - # 通知服务提供方进行NAT打洞 - punch_msg = { - 'action': 'punch_request', - 'connector_addr': connector_addr, - 'service_name': service_name # 请求的服务名称 - } - send_json(provider_info['conn'], punch_msg) - - # 响应请求方 - send_json(conn, { - 'status': 'success', - 'provider_addr': provider_addr - }) - - # 使用后立即销毁令牌 - with self.lock: - if connector_token in self.tokens: - del self.tokens[connector_token] - else: - send_json(conn, {'status': 'error', 'message': 'Service not available'}) - except (ConnectionResetError, json.JSONDecodeError): - pass - finally: - conn.close() - print(f"Connection closed: {addr}") - # 清理资源 - if token: - with self.lock: - if token in self.tokens: - del self.tokens[token] - if token in self.services: - del self.services[token] - - def validate_token(self, token, ip): - # 验证令牌有效性:存在、IP匹配、未过期 - with self.lock: - token_info = self.tokens.get(token) - if token_info and token_info['ip'] == ip and token_info['expiry'] > time.time(): - return True - return False - - def find_service_provider(self, service_name): - # 查找可用服务提供者 - for token, info in self.services.items(): - if service_name in info['services']: - return token - return None - - def start(self): - # 启动协调器服务 - server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server.bind((self.host, self.port)) - server.listen(5) - print(f"Coordinator listening on {self.host}:{self.port}") - - while True: - conn, addr = server.accept() - # 为每个连接创建独立线程 - client_thread = threading.Thread( - target=self.handle_client, - args=(conn, addr), - daemon=True - ) - client_thread.start() - - -if __name__ == "__main__": - coordinator = Coordinator() - coordinator.start() diff --git a/provider.py b/provider.py deleted file mode 100644 index 0565d05..0000000 --- a/provider.py +++ /dev/null @@ -1,182 +0,0 @@ -import socket -import threading -import json -import hashlib -import time -import struct - - -# 定义 Provider 类,用于处理与协调器的连接和P2P通信 -class Provider: - def __init__(self, coordinator_host='www.awin-x.top', coordinator_port=5000): - # 初始化协调器的主机和端口 - self.coordinator_addr = (coordinator_host, coordinator_port) - # 创建与协调器的TCP连接 - self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.coord_conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.coord_conn.connect(self.coordinator_addr) - self.local_port = self.coord_conn.getsockname()[1] - # 用于存储认证令牌 - self.token = None - # 定义可提供的服务及其默认端口 - self.service_ports = {'ssh': 22, 'alist': 5244, 'minecraft': 25565} - # 存储连接的客户端 - self.connections = {} - # 用于线程安全操作的锁 - self.lock = threading.Lock() - - def connect_to_coordinator(self): - # 发送登录请求 - self._send_json({'action': 'login', 'account': 'admin'}) - response = self._recv_json() - - # 处理协调器返回的盐值并进行密码哈希验证 - if response.get('status') == 'salt': - salt = response['salt'] - password_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest() - self._send_json({'action': 'auth', 'hash': password_hash}) - response = self._recv_json() - - # 如果认证成功,存储令牌并注册服务 - if response.get('status') == 'success': - self.token = response['token'] - print(f"Authenticated. Token: {self.token}") - - self._send_json({ - 'action': 'register_service', - 'services': list(self.service_ports.keys()), - 'token': self.token - }) - response = self._recv_json() - if response.get('status') == 'success': - print("Services registered") - return True - print("Connection to coordinator failed") - return False - - def handle_punch_request(self, data): - connector_addr = tuple(data['connector_addr']) - service_name = data['service_name'] - print(f"Punching hole to connector at {connector_addr}, waiting 10 seconds...") - - udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - udp_socket.bind(('0.0.0.0', self.local_port)) - # 向对方发送打洞包 - for i in range(10): - udp_socket.sendto(b'punch punch punch punch', connector_addr) - time.sleep(0.2) - udp_socket.close() - - time.sleep(3) - - punch_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - punch_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - punch_sock.settimeout(10) - punch_sock.bind(('0.0.0.0', self.local_port)) - - try: - punch_sock.connect(connector_addr) - print("Successfully connected to connector after delay") - threading.Thread( - target=self.handle_connector_connection, - args=(punch_sock, service_name), - daemon=True - ).start() - except socket.error as e: - print(f"Punching failed: {e}") - punch_sock.close() - - def handle_connector_connection(self, sock, service_name): - # 处理与客户端的连接,启动心跳机制 - threading.Thread(target=self.send_heartbeats, args=(sock,), daemon=True).start() - - try: - while True: - # 接收连接头信息 - header = sock.recv(5) - if not header: - break - - conn_id, data_len = struct.unpack("!I B", header) - data = sock.recv(data_len) if data_len > 0 else b'' - - if not data: - with self.lock: - if conn_id in self.connections: - self.connections[conn_id].close() - del self.connections[conn_id] - continue - - with self.lock: - if conn_id not in self.connections: - service_port = self.service_ports.get(service_name, 22) - service_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - service_sock.connect(('127.0.0.1', service_port)) - self.connections[conn_id] = service_sock - threading.Thread( - target=self.forward_data, - args=(service_sock, sock, conn_id), - daemon=True - ).start() - - self.connections[conn_id].sendall(data) - except ConnectionResetError: - pass - finally: - sock.close() - with self.lock: - for conn_id, service_sock in list(self.connections.items()): - service_sock.close() - self.connections.clear() - - def forward_data(self, src, dst, conn_id): - # 转发数据 - try: - while True: - data = src.recv(4096) - if not data: - break - header = struct.pack("!I B", conn_id, len(data)) - dst.sendall(header + data) - finally: - src.close() - with self.lock: - if conn_id in self.connections: - del self.connections[conn_id] - - def send_heartbeats(self, sock): - # 发送心跳包以保持连接 - while True: - try: - sock.sendall(b'\x00\x00\x00\x00\x00') # Empty heartbeat - time.sleep(5) - except: - break - - def start(self): - # 启动提供者,连接到协调器并开始处理请求 - if not self.connect_to_coordinator(): - return - - try: - while True: - data = self._recv_json() - if data and data.get('action') == 'punch_request': - self.handle_punch_request(data) - except (ConnectionResetError, json.JSONDecodeError): - print("Disconnected from coordinator") - - def _send_json(self, data): - # 发送JSON数据 - self.coord_conn.sendall(json.dumps(data).encode()) - - def _recv_json(self): - # 接收JSON数据 - data = self.coord_conn.recv(4096) - return json.loads(data.decode()) if data else None - - -if __name__ == "__main__": - provider = Provider() - provider.start() diff --git a/server.py b/server.py new file mode 100644 index 0000000..5c9b916 --- /dev/null +++ b/server.py @@ -0,0 +1,155 @@ +import json +import socket +import struct +import threading +import time + +services = { + 'ssh': 22, + 'alist': 5244, + 'minecraft': 25565 +} +co_server = 'www.awin-x.top' +co_port = 5000 + +tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +tcp_sock.bind(('0.0.0.0', 0)) +tcp_sock.listen(5) +listen_port = tcp_sock.getsockname()[1] +print(f"监听端口: {listen_port}") + + +def pack_data(action_, message_, data_): + # 将字符串编码为UTF-8字节串 + action_bytes = action_.encode('utf-8') + message_bytes = message_.encode('utf-8') + # 使用大端序打包长度信息(4字节无符号整数) + # >I: 大端序无符号整型(4字节) + packed = struct.pack('>I', len(action_bytes)) + packed += action_bytes + packed += struct.pack('>I', len(message_bytes)) + packed += message_bytes + packed += data_ # 直接附加二进制数据 + return packed + + +def unpack_data(packed): + # 解析action长度(前4字节) + action_len = struct.unpack('>I', packed[:4])[0] + offset = 4 + # 提取action字节并解码 + action_bytes = packed[offset:offset + action_len] + action_ = action_bytes.decode('utf-8') + offset += action_len + # 解析message长度(接下来的4字节) + message_len = struct.unpack('>I', packed[offset:offset + 4])[0] + offset += 4 + # 提取message字节并解码 + message_bytes = packed[offset:offset + message_len] + message_ = message_bytes.decode('utf-8') + offset += message_len + # 剩余部分是原始二进制数据 + data_ = packed[offset:] + return action_, message_, data_ + + +def send_data(conn, action_, message_, data_): + packed = pack_data(action_, message_, data_) + conn.sendall(packed) + + +def recv_data(conn): + data_ = conn.recv(4096) + action_, message_, data_ = unpack_data(data_) + # print(f"收到来自 {conn.getpeername()} 的数据: {data.decode()}") + return action_, message_, data_ + + +def handle_hello(addr, message, data): + pass + + +def handle_bye(addr, message, data): + pass + + +def handle_punch_to(addr, message, data): + pass + + +actions = { + 'hello': handle_hello, + 'bye': lambda: print("收到来自 {client_addr} 的断开连接请求"), + 'punch_to': lambda: print("收到来自 {client_addr} 的打洞请求"), +} + + +def heart_beat_thread(): + udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + udp_sock.bind(('0.0.0.0', listen_port)) + heart_beat_pack = pack_data('heart_beat', 'json_data', json.dumps({ + 'services': services, + }).encode()) + while True: + udp_sock.sendto(heart_beat_pack, (co_server, co_port)) + time.sleep(10) + + +def co_server_thread(co_server_sock, co_server_addr): + while True: + try: + action, message, data = recv_data(co_server_sock) + except Exception as e: + print(e) + send_data(co_server_addr, 'error', f"无法解析数据", b'') + break + if action in actions: + actions[action](co_server_addr, message, data) + else: + send_data(co_server_addr, 'error', f"未知操作: {action}", b'') + co_server_sock.close() + + +def client_thread(client_sock, client_addr): + while True: + try: + action, message, data = recv_data(client_sock) + except Exception as e: + print(e) + send_data(client_sock, 'error', f"无法解析数据", b'') + break + if action in actions: + actions[action](client_sock, client_addr, message, data) + else: + send_data(client_sock, 'error', f"未知操作: {action}", b'') + client_sock.close() + + +if __name__ == '__main__': + # 心跳线程 + threading.Thread(target=heart_beat_thread, daemon=True).start() + + # 等待协服务器 + while True: + co_server_sock, co_server_addr = tcp_sock.accept() + action, message, data = recv_data(co_server_sock) + if action == 'hello': + if handle_hello(co_server_addr, message, data) == 'co_server': + print(f'协服务器{co_server_addr}连接成功') + threading.Thread(target=co_server_thread, daemon=True).start() + break + co_server_sock.close() + + # 等待客户端 + while True: + client_sock, client_addr = tcp_sock.accept() + action, message, data = recv_data(client_sock) + print(f"收到来自 {client_addr} 的连接") + if action == 'hello': + if handle_hello(co_server_addr, message, data) == 'client': + print(f"确认客户端 {client_addr}") + threading.Thread(target=client_thread, daemon=True).start() + break + client_sock.close()