From bb3be9c41b0dd20a98699685ed235b45e738ab20 Mon Sep 17 00:00:00 2001 From: awinx Date: Sat, 31 May 2025 15:28:20 +0800 Subject: [PATCH] tcp --- connector.py | 43 ++++++++++----------- coordinator.py | 100 +++++++++++++++---------------------------------- provider.py | 33 ++++++++-------- 3 files changed, 68 insertions(+), 108 deletions(-) diff --git a/connector.py b/connector.py index 12da936..a6e59b6 100644 --- a/connector.py +++ b/connector.py @@ -7,19 +7,18 @@ import time class Connector: - def __init__(self, coordinator_host='127.0.0.1', coordinator_port=5000, local_port=2222): - self.coordinator_host = coordinator_host - self.coordinator_port = coordinator_port - self.local_port = local_port + def __init__(self, coordinator_host='127.0.0.1', coordinator_port=5000): + self.coordinator_addr = (coordinator_host, coordinator_port) + self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.coord_conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.coord_conn.connect(self.coordinator_addr) + self.local_port = self.coord_conn.getsockname()[1] self.token = None self.connections = {} self.conn_counter = 0 self.lock = threading.Lock() def connect_to_coordinator(self): - self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.coord_conn.connect((self.coordinator_host, self.coordinator_port)) - # Login self._send_json({'action': 'login', 'account': 'admin'}) response = self._recv_json() @@ -38,36 +37,38 @@ class Connector: return False def request_service(self, service_name): - punch_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - punch_socket.bind(('0.0.0.0', 0)) - punch_port = punch_socket.getsockname()[1] - gathering_port = self.coordinator_port+2 - print(f'punch socket send to {(self.coordinator_host, gathering_port)}') - punch_socket.sendto(json.dumps({'token': self.token}).encode(), (self.coordinator_host, gathering_port)) - self._send_json({ 'action': 'request_service', 'service_name': service_name, 'token': self.token }) - time.sleep(3) response = self._recv_json() if response.get('status') == 'success': provider_addr = tuple(response['provider_addr']) print(f"Connecting to provider at {provider_addr}") + punch_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + punch_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + punch_socket.bind(('0.0.0.0', self.local_port)) # 向对方发送打洞包 - for i in range(20): + for i in range(10): punch_socket.sendto(b'pong pong pong pong', provider_addr) time.sleep(0.2) + punch_socket.close() + + try: + punch_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + punch_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + punch_socket.connect(provider_addr) + except: + print("tcp 打洞") - # Start listening for incoming connections from provider listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - listener.bind(('0.0.0.0', punch_port)) + listener.bind(('0.0.0.0', self.local_port)) listener.listen(5) - print(f"Listening on port {punch_port} for provider connections") + print(f"Listening on port {self.local_port} for provider connections") # Start handler thread to accept provider's connection threading.Thread( @@ -177,5 +178,5 @@ class Connector: if __name__ == "__main__": - connector = Connector(coordinator_host='www.awin-x.top',local_port=2222) - connector.start(service_name='ssh') \ No newline at end of file + connector = Connector(coordinator_host='www.awin-x.top') + connector.start(service_name='ssh') diff --git a/coordinator.py b/coordinator.py index bedeac9..95dc36e 100644 --- a/coordinator.py +++ b/coordinator.py @@ -8,13 +8,24 @@ import time from collections import defaultdict +def send_json(conn, data): + # 发送JSON数据 + conn.sendall(json.dumps(data).encode()) + + +def recv_json(conn): + # 接收并解析JSON数据 + data = conn.recv(4096) + if not data: + return None + return json.loads(data.decode()) + + class Coordinator: def __init__(self, host='0.0.0.0', port=5000): # 初始化协调器服务端参数 self.host = host # 监听地址 self.port = port # 监听端口 - self.gathering_port = port + 2 - self.punch_addr = defaultdict() # 生成盐值用于密码加密 self.salt = secrets.token_hex(8) # 存储管理员密码哈希值(盐+密码) @@ -38,7 +49,7 @@ class Coordinator: try: while True: # 接收客户端JSON数据 - data = self.recv_json(conn) + data = recv_json(conn) if not data: break @@ -48,9 +59,9 @@ class Coordinator: if action == 'login': if data.get('account') == 'admin': response = {'status': 'salt', 'salt': salt} - self.send_json(conn, response) + send_json(conn, response) else: - self.send_json(conn, {'status': 'error', 'message': 'Invalid account'}) + send_json(conn, {'status': 'error', 'message': 'Invalid account'}) # 认证流程:验证密码哈希 elif action == 'auth': @@ -59,13 +70,13 @@ class Coordinator: token = secrets.token_hex(8) with self.lock: self.tokens[token] = { - 'ip': addr[0], + 'ip': addr[0], 'expiry': time.time() + 3600 # 令牌过期时间 } response = {'status': 'success', 'token': token, 'message': 'Login successful'} - self.send_json(conn, response) + send_json(conn, response) else: - self.send_json(conn, {'status': 'error', 'message': 'Authentication failed'}) + send_json(conn, {'status': 'error', 'message': 'Authentication failed'}) # 服务注册流程 elif action == 'register_service': @@ -75,18 +86,18 @@ class Coordinator: with self.lock: self.services[connector_token] = { 'services': services, # 支持的服务列表 - 'addr': addr, # 客户端地址信息 - 'conn': conn # 客户端连接套接字 + 'addr': addr, # 客户端地址信息 + 'conn': conn # 客户端连接套接字 } - self.send_json(conn, {'status': 'success', 'message': 'Services registered'}) + send_json(conn, {'status': 'success', 'message': 'Services registered'}) else: - self.send_json(conn, {'status': 'error', 'message': 'Invalid token'}) + send_json(conn, {'status': 'error', 'message': 'Invalid token'}) # 服务请求流程 elif action == 'request_service': connector_token = data.get('token') if not self.validate_token(connector_token, addr[0]): - self.send_json(conn, {'status': 'error', 'message': 'Invalid token'}) + send_json(conn, {'status': 'error', 'message': 'Invalid token'}) continue service_name = data.get('service_name') @@ -97,43 +108,27 @@ class Coordinator: provider_addr = provider_info['addr'] connector_addr = addr count = 0 - while connector_token not in self.punch_addr and count < 11: - time.sleep(0.3) - count += 1 - if count >= 10: - print("Timeout waiting for connector to respond") - continue # 通知服务提供方进行NAT打洞 punch_msg = { 'action': 'punch_request', - 'connector_addr': self.punch_addr[connector_token], - 'service_name': service_name # 请求的服务名称 + 'connector_addr': connector_addr, + 'service_name': service_name # 请求的服务名称 } - self.punch_addr.pop(connector_token) - self.send_json(provider_info['conn'], punch_msg) - - count = 0 - while provider_token not in self.punch_addr and count < 11: - time.sleep(0.3) - count += 1 - if count >= 10: - print("Timeout waiting for provider to respond") - continue + send_json(provider_info['conn'], punch_msg) # 响应请求方 - self.send_json(conn, { + send_json(conn, { 'status': 'success', - 'provider_addr': self.punch_addr[provider_token] + 'provider_addr': provider_addr }) - self.punch_addr.pop(provider_token) # 使用后立即销毁令牌 with self.lock: if connector_token in self.tokens: del self.tokens[connector_token] else: - self.send_json(conn, {'status': 'error', 'message': 'Service not available'}) + send_json(conn, {'status': 'error', 'message': 'Service not available'}) except (ConnectionResetError, json.JSONDecodeError): pass finally: @@ -162,34 +157,6 @@ class Coordinator: return token return None - def recv_json(self, conn): - # 接收并解析JSON数据 - data = conn.recv(4096) - if not data: - return None - return json.loads(data.decode()) - - def send_json(self, conn, data): - # 发送JSON数据 - conn.sendall(json.dumps(data).encode()) - - def punch_port_gathering(self): - gathering_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # 绑定到相同的本地端口(用于后续TCP连接) - gathering_socket.bind(('0.0.0.0', self.gathering_port)) - print(f"Starting punch port gathering on {gathering_socket.getsockname()}") - while True: - data, addr = gathering_socket.recvfrom(4096) - print(f"Received punch port from {addr}") - try: - token = json.loads(data.decode()).get('token') - if self.validate_token(token, addr[0]): - self.punch_addr[token] = addr - except (ConnectionResetError, json.JSONDecodeError): - pass - - - def start(self): # 启动协调器服务 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -198,11 +165,6 @@ class Coordinator: server.listen(5) print(f"Coordinator listening on {self.host}:{self.port}") - gathering_thread = threading.Thread( - target=self.punch_port_gathering, - daemon=True - ) - gathering_thread.start() while True: conn, addr = server.accept() # 为每个连接创建独立线程 @@ -216,4 +178,4 @@ class Coordinator: if __name__ == "__main__": coordinator = Coordinator() - coordinator.start() \ No newline at end of file + coordinator.start() diff --git a/provider.py b/provider.py index 85014be..0565d05 100644 --- a/provider.py +++ b/provider.py @@ -8,10 +8,14 @@ import struct # 定义 Provider 类,用于处理与协调器的连接和P2P通信 class Provider: - def __init__(self, coordinator_host='127.0.0.1', coordinator_port=5000): + def __init__(self, coordinator_host='www.awin-x.top', coordinator_port=5000): # 初始化协调器的主机和端口 - self.coordinator_host = coordinator_host - self.coordinator_port = coordinator_port + self.coordinator_addr = (coordinator_host, coordinator_port) + # 创建与协调器的TCP连接 + self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.coord_conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.coord_conn.connect(self.coordinator_addr) + self.local_port = self.coord_conn.getsockname()[1] # 用于存储认证令牌 self.token = None # 定义可提供的服务及其默认端口 @@ -22,10 +26,6 @@ class Provider: self.lock = threading.Lock() def connect_to_coordinator(self): - # 创建与协调器的TCP连接 - self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.coord_conn.connect((self.coordinator_host, self.coordinator_port)) - # 发送登录请求 self._send_json({'action': 'login', 'account': 'admin'}) response = self._recv_json() @@ -56,27 +56,24 @@ class Provider: def handle_punch_request(self, data): connector_addr = tuple(data['connector_addr']) - udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - udp_socket.bind(('0.0.0.0', 0)) - punch_port = udp_socket.getsockname()[1] - print(f"UDP punching using {punch_port}") - udp_socket.sendto(json.dumps({ - 'token': self.token - }).encode(), (self.coordinator_host, self.coordinator_port + 2)) service_name = data['service_name'] print(f"Punching hole to connector at {connector_addr}, waiting 10 seconds...") + udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + udp_socket.bind(('0.0.0.0', self.local_port)) # 向对方发送打洞包 - for i in range(20): + for i in range(10): udp_socket.sendto(b'punch punch punch punch', connector_addr) time.sleep(0.2) + udp_socket.close() - time.sleep(10) + time.sleep(3) punch_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) punch_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) punch_sock.settimeout(10) - punch_sock.bind(('0.0.0.0', punch_port)) + punch_sock.bind(('0.0.0.0', self.local_port)) try: punch_sock.connect(connector_addr) @@ -181,5 +178,5 @@ class Provider: if __name__ == "__main__": - provider = Provider(coordinator_host='www.awin-x.top') + provider = Provider() provider.start()