From a174227fd409302edc57831c28ab558dbe1fa6d1 Mon Sep 17 00:00:00 2001 From: awinx Date: Fri, 30 May 2025 20:38:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E7=89=88=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/AwinSimpleP2P.iml | 2 +- .idea/misc.xml | 5 +- connector1.py | 213 ++++++++++++++++++++++++++++++++++++++++ coordinator.py | 137 ++++++++++++++++++++++++++ provider1.py | 200 +++++++++++++++++++++++++++++++++++++ 5 files changed, 555 insertions(+), 2 deletions(-) create mode 100644 connector1.py create mode 100644 coordinator.py create mode 100644 provider1.py diff --git a/.idea/AwinSimpleP2P.iml b/.idea/AwinSimpleP2P.iml index 2c80e12..2709703 100644 --- a/.idea/AwinSimpleP2P.iml +++ b/.idea/AwinSimpleP2P.iml @@ -4,7 +4,7 @@ - + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index 7e0e2b6..b3e4665 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,7 @@ - + + + \ No newline at end of file diff --git a/connector1.py b/connector1.py new file mode 100644 index 0000000..373f70a --- /dev/null +++ b/connector1.py @@ -0,0 +1,213 @@ +import socket +import json +import threading +import time +import uuid + + +class ServiceConnector: + def __init__(self, coordinator_addr, service_name, local_port): + self.target_id = None + self.coordinator_addr = coordinator_addr + self.service_name = service_name + self.local_port = local_port + self.client_id = f"connector-{uuid.uuid4().hex[:8]}" + + # 创建UDP套接字用于协调通信 + self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.udp_sock.bind(('0.0.0.0', 0)) + self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) + + # 创建TCP套接字用于本地监听 + self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.tcp_sock.bind(('127.0.0.1', local_port)) + self.tcp_sock.listen(5) + print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'") + + # 存储活动连接 + self.active_connections = {} + self.provider_addr = None + self.internal_port = None + self.running = True + + def request_service(self): + """向协调服务器请求服务""" + message = { + 'action': 'request', + 'service_name': self.service_name, + 'client_id': self.client_id + } + self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) + + # 等待响应 + data, _ = self.udp_sock.recvfrom(4096) + response = json.loads(data.decode()) + if response['status'] == 'success': + self.provider_addr = tuple(response['provider_addr']) + self.internal_port = response['internal_port'] + self.target_id = response['target_id'] + print(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}") + return True + else: + print(f"服务请求失败: {response['message']}") + return False + + def punch_hole(self): + """执行UDP打洞""" + if not self.provider_addr: + return False + + # 请求打洞 + message = { + 'action': 'punch_request', + 'client_id': self.client_id, + 'target_id': self.target_id + } + self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) + + # 等待协调服务器响应 + data, _ = self.udp_sock.recvfrom(4096) + response = json.loads(data.decode()) + if response['status'] != 'success': + print(f"打洞请求失败: {response['message']}") + return False + + # 向服务提供者发送打洞包 + print(f"尝试打洞到 {self.provider_addr}...") + for _ in range(5): + self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr) + time.sleep(0.5) + + # 检查连通性 + self.udp_sock.settimeout(2.0) + try: + self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr) + data, addr = self.udp_sock.recvfrom(1024) + if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr: + print("打洞成功! 已建立UDP连接") + return True + else: + print(f"错误的打洞响应{data}") + return False + except socket.timeout: + print("打洞失败: 未收到响应") + return False + finally: + self.udp_sock.settimeout(None) + + def udp_listener(self): + """监听UDP消息""" + while self.running: + try: + data, addr = self.udp_sock.recvfrom(4096) + message = json.loads(data.decode()) + + if message['action'] == 'punch_response': + # 打洞响应 - 确认连通性 + print(f"收到来自 {addr} 的打洞响应") + elif message['action'] == 'data': + if message['conn_id'] in self.active_connections: + # 转发数据到本地客户端 + self.active_connections[message['conn_id']].sendall(bytes.fromhex(message['data'])) + else: + print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接") + except Exception as e: + print(e) + + def tcp_listener(self): + """监听本地TCP连接""" + while self.running: + try: + client_sock, client_addr = self.tcp_sock.accept() + print(f"新的本地连接来自 {client_addr}") + + # 为每个连接生成唯一ID + conn_id = str(uuid.uuid4()) + + # 存储连接 + self.active_connections[conn_id] = client_sock + + # 请求服务提供者建立连接 + self.udp_sock.sendto(json.dumps({ + 'action': 'connect', + 'conn_id': conn_id + }).encode(), self.provider_addr) + + time.sleep(0.5) + + # 启动数据转发线程 + threading.Thread( + target=self.forward_data, + args=(conn_id, client_sock), + daemon=True + ).start() + + except: + pass + + def forward_data(self, conn_id, client_sock): + """转发本地TCP数据到UDP隧道""" + CHUNK_SIZE = 1472 # 根据 MTU 调整最大分片大小 + try: + while True: + # 从本地客户端读取数据 + data = client_sock.recv(4096) + if not data: + break + + # # 通过UDP发送给服务提供者 + # self.udp_sock.sendto(json.dumps({ + # 'action': 'data', + # 'conn_id': conn_id, + # 'data': data.hex() # 十六进制编码二进制数据 + # }).encode(), self.provider_addr) + # 分片发送数据 + for i in range(0, len(data), CHUNK_SIZE): + chunk = data[i:i + CHUNK_SIZE] + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'conn_id': conn_id, + 'data': chunk.hex() + }).encode(), self.provider_addr) + except: + pass + finally: + client_sock.close() + if conn_id in self.active_connections: + del self.active_connections[conn_id] + + def run(self): + """运行服务连接端""" + # 请求服务 + if not self.request_service(): + return + + # 执行打洞 + if not self.punch_hole(): + return + + # 启动UDP监听线程 + threading.Thread(target=self.udp_listener, daemon=True).start() + + # 启动TCP监听线程 + threading.Thread(target=self.tcp_listener, daemon=True).start() + + # 保持主线程运行 + try: + while self.running: + time.sleep(1) + except KeyboardInterrupt: + self.running = False + self.udp_sock.close() + self.tcp_sock.close() + print("服务连接端已停止") + + +if __name__ == '__main__': + # 配置信息 + COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP + SERVICE_NAME = "my_game_server" + LOCAL_PORT = 12345 # 本地映射端口 + + connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT) + connector.run() diff --git a/coordinator.py b/coordinator.py new file mode 100644 index 0000000..9f2fa8c --- /dev/null +++ b/coordinator.py @@ -0,0 +1,137 @@ +import socket +import json +import time +from collections import defaultdict + + +class CoordinatorServer: + def __init__(self, host='0.0.0.0', port=5000): + self.host = host + self.port = port + self.services = {} # 服务名 -> (公网地址, 内网端口) + self.clients = defaultdict(dict) # 客户端ID -> 信息 + self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.udp_sock.bind((host, port)) + self.running = True + print(f"协调服务器运行在 {host}:{port}") + + def handle_register(self, data, addr): + """处理服务注册请求""" + try: + service_name = data['service_name'] + internal_port = data['internal_port'] + client_id = data['client_id'] + + # 记录客户端信息 + self.clients[client_id] = { + 'addr': addr, + 'service_name': service_name, + 'internal_port': internal_port, + 'last_seen': time.time() + } + + # 注册服务 + self.services[service_name] = (addr, internal_port) + print(f"服务注册: {service_name} (端口:{internal_port}) 来自 {addr}") + + return {'status': 'success', 'message': '服务注册成功'} + except Exception as e: + return {'status': 'error', 'message': str(e)} + + def handle_request(self, data, addr): + """处理服务请求""" + try: + service_name = data['service_name'] + client_id = data['client_id'] + + if service_name not in self.services: + return {'status': 'error', 'message': '服务未找到'} + + # 记录请求客户端信息 + self.clients[client_id] = { + 'addr': addr, + 'service_name': service_name, + 'last_seen': time.time() + } + + # 获取服务提供者的信息 + provider_addr, internal_port = self.services[service_name] + target_id = [ + client_id + for client_id, info in self.clients.items() + if info['service_name'] == service_name + ][0] + + print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}") + + return { + 'status': 'success', + 'provider_addr': provider_addr, + 'internal_port': internal_port, + 'target_id': target_id + } + except Exception as e: + return {'status': 'error', 'message': str(e)} + + def handle_punch_request(self, data, addr): + """处理打洞请求""" + try: + client_id = data['client_id'] + target_id = data['target_id'] + + # 获取目标客户端信息 + if target_id not in self.clients: + return {'status': 'error', 'message': '目标客户端未找到'} + + target_addr = self.clients[target_id]['addr'] + + print(f"打洞请求: {addr} -> {target_addr}") + + # 通知双方对方的地址 + self.udp_sock.sendto(json.dumps({ + 'action': 'punch_request', + 'client_id': client_id, + 'client_addr': addr + }).encode(), target_addr) + + return { + 'status': 'success', + 'target_addr': target_addr + } + except Exception as e: + return {'status': 'error', 'message': str(e)} + + def run(self): + """运行协调服务器""" + print("协调服务器已启动,等待连接...") + while self.running: + try: + data, addr = self.udp_sock.recvfrom(4096) + try: + message = json.loads(data.decode()) + action = message.get('action') + + if action == 'register': + response = self.handle_register(message, addr) + elif action == 'request': + response = self.handle_request(message, addr) + elif action == 'punch_request': + response = self.handle_punch_request(message, addr) + else: + response = {'status': 'error', 'message': '无效操作'} + + self.udp_sock.sendto(json.dumps(response).encode(), addr) + except json.JSONDecodeError: + self.udp_sock.sendto(json.dumps({ + 'status': 'error', + 'message': '无效的JSON数据' + }).encode(), addr) + except Exception as e: + print(f"服务器错误: {str(e)}") + + self.udp_sock.close() + + +if __name__ == '__main__': + server = CoordinatorServer() + server.run() diff --git a/provider1.py b/provider1.py new file mode 100644 index 0000000..9baa207 --- /dev/null +++ b/provider1.py @@ -0,0 +1,200 @@ +import socket +import json +import threading +import time +import uuid + + +class ServiceProvider: + def __init__(self, coordinator_addr, service_name, internal_port): + self.coordinator_addr = coordinator_addr + self.service_name = service_name + self.internal_port = internal_port + self.client_id = f"provider-{uuid.uuid4().hex[:8]}" + + # 创建UDP套接字用于协调通信 + self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.udp_sock.bind(('0.0.0.0', 0)) + self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) + + # # 创建TCP套接字用于服务监听 + # self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # self.tcp_sock.bind(('0.0.0.0', internal_port)) + # self.tcp_sock.listen(5) + print(f"服务端监听在端口 {internal_port}") + + # 存储活动连接 + self.active_connections = {} + self.running = True + + def register_service(self): + """向协调服务器注册服务""" + message = { + 'action': 'register', + 'service_name': self.service_name, + 'internal_port': self.internal_port, + 'external_port': self.udp_sock.getsockname()[1], + 'client_id': self.client_id + } + print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'") + self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr) + + # 等待响应 + data, _ = self.udp_sock.recvfrom(4096) + response = json.loads(data.decode()) + if response['status'] == 'success': + print(f"服务 '{self.service_name}' 注册成功") + else: + print(f"注册失败: {response['message']}") + + def udp_listener(self): + """监听UDP消息""" + while self.running: + try: + data, addr = self.udp_sock.recvfrom(4096) + print(f"收到来自 {addr} 的消息: {data.decode()}") + message = json.loads(data.decode()) + + if message.get('action') == 'punch' or message.get('action') == 'punch_check': + # 打洞请求 - 发送响应以确认连通性 + self.udp_sock.sendto(json.dumps( + {'action': 'punch_response', + 'client_id': message['client_id'] + }).encode(), addr) + print(f"收到来自 {addr} 的打洞请求,已响应") + elif message.get('action') == 'punch_response': + # 打洞响应 - 确认打洞成功 + print(f"收到来自 {addr} 的打洞响应") + elif message.get('action') == 'connect': + # 新的连接请求 + conn_id = message['conn_id'] + print(f"收到来自 {addr} 的连接请求") + threading.Thread( + target=self.handle_connection, + args=(conn_id, addr), + daemon=True + ).start() + elif message.get('action') == 'punch_request': + client_addr = message['client_addr'] + print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞") + self.punch_hole(message) + elif message.get('action') == 'data': + # 接收到来自客户端的数据 + conn_id = message['conn_id'] + data = bytes.fromhex(message['data']) + if conn_id in self.active_connections: + # 转发数据到本地服务 + print(f"收到来自 {addr} 的数据{data}\n,转发到本地服务") + self.active_connections[conn_id]['local_sock'].sendall(data) + else: + print(f"收到来自 {addr} 的数据,但未找到对应的连接") + else: + print(f"收到未知消息: {message}") + except Exception as e: + print(e) + pass + + def handle_connection(self, conn_id, client_addr): + """处理来自客户端的连接""" + try: + # 接受本地服务连接 + local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + local_sock.connect(('127.0.0.1', self.internal_port)) + if not local_sock: + print("无法连接到本地服务") + return + + # 创建与客户端的UDP隧道 + print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}") + + # 存储连接 + self.active_connections[conn_id] = { + 'local_sock': local_sock, + 'client_addr': client_addr + } + + # 通知客户端连接就绪 + self.udp_sock.sendto(json.dumps({ + 'action': 'connected', + 'client_id': conn_id + }).encode(), client_addr) + + # 启动数据转发 + self.forward_data(conn_id, local_sock, client_addr) + except Exception as e: + print(f"连接失败: {str(e)}") + self.udp_sock.sendto(json.dumps({ + 'action': 'connect_failed', + 'client_id': conn_id, + 'message': str(e) + }).encode(), client_addr) + + def forward_data(self, conn_id, local_sock, client_addr): + """转发TCP数据到UDP隧道""" + CHUNK_SIZE = 1472 # 根据 MTU 调整最大分片大小 + try: + while True: + # 从本地服务读取数据 + data = local_sock.recv(4096) + if not data: + break + + # # 通过UDP发送给客户端 + # self.udp_sock.sendto(json.dumps({ + # 'action': 'data', + # 'conn_id': conn_id, + # 'data': data.hex() # 十六进制编码二进制数据 + # }).encode(), client_addr) + for i in range(0, len(data), CHUNK_SIZE): + chunk = data[i:i + CHUNK_SIZE] + self.udp_sock.sendto(json.dumps({ + 'action': 'data', + 'conn_id': conn_id, + 'data': chunk.hex() + }).encode(), client_addr) + except Exception as e: + print(f"转发数据失败: {str(e)}") + finally: + local_sock.close() + if conn_id in self.active_connections: + del self.active_connections[conn_id] + print(f"连接 {conn_id} 已关闭") + + def run(self): + """运行服务提供端""" + # 注册服务 + self.register_service() + + # 启动UDP监听线程 + threading.Thread(target=self.udp_listener, daemon=True).start() + + # 保持主线程运行 + try: + while self.running: + time.sleep(1) + except KeyboardInterrupt: + self.running = False + self.udp_sock.close() + print("服务提供端已停止") + + def punch_hole(self, message): + for i in range(5): + try: + self.udp_sock.sendto(json.dumps({ + 'action': 'punch', + 'client_id': message['client_id'] + }).encode(), tuple(message['client_addr'])) + time.sleep(2) + except Exception as e: + print(f"打洞失败: {str(e)}") + time.sleep(1) + + +if __name__ == '__main__': + # 配置信息 + COORDINATOR_ADDR = ('127.0.0.1', 5000) # 替换为公网服务器IP + SERVICE_NAME = "my_game_server" + INTERNAL_PORT = 8888 # 内网游戏服务器端口 + + provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT) + provider.run()