diff --git a/.idea/AwinSimpleP2P.iml b/.idea/AwinSimpleP2P.iml
index 2c80e12..2709703 100644
--- a/.idea/AwinSimpleP2P.iml
+++ b/.idea/AwinSimpleP2P.iml
@@ -4,7 +4,7 @@
-
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
index 7e0e2b6..b3e4665 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,7 @@
-
+
+
+
+
\ No newline at end of file
diff --git a/connector1.py b/connector1.py
new file mode 100644
index 0000000..373f70a
--- /dev/null
+++ b/connector1.py
@@ -0,0 +1,213 @@
+import socket
+import json
+import threading
+import time
+import uuid
+
+
+class ServiceConnector:
+ def __init__(self, coordinator_addr, service_name, local_port):
+ self.target_id = None
+ self.coordinator_addr = coordinator_addr
+ self.service_name = service_name
+ self.local_port = local_port
+ self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
+
+ # 创建UDP套接字用于协调通信
+ self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.udp_sock.bind(('0.0.0.0', 0))
+ self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
+
+ # 创建TCP套接字用于本地监听
+ self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.tcp_sock.bind(('127.0.0.1', local_port))
+ self.tcp_sock.listen(5)
+ print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
+
+ # 存储活动连接
+ self.active_connections = {}
+ self.provider_addr = None
+ self.internal_port = None
+ self.running = True
+
+ def request_service(self):
+ """向协调服务器请求服务"""
+ message = {
+ 'action': 'request',
+ 'service_name': self.service_name,
+ 'client_id': self.client_id
+ }
+ self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
+
+ # 等待响应
+ data, _ = self.udp_sock.recvfrom(4096)
+ response = json.loads(data.decode())
+ if response['status'] == 'success':
+ self.provider_addr = tuple(response['provider_addr'])
+ self.internal_port = response['internal_port']
+ self.target_id = response['target_id']
+ print(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}")
+ return True
+ else:
+ print(f"服务请求失败: {response['message']}")
+ return False
+
+ def punch_hole(self):
+ """执行UDP打洞"""
+ if not self.provider_addr:
+ return False
+
+ # 请求打洞
+ message = {
+ 'action': 'punch_request',
+ 'client_id': self.client_id,
+ 'target_id': self.target_id
+ }
+ self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
+
+ # 等待协调服务器响应
+ data, _ = self.udp_sock.recvfrom(4096)
+ response = json.loads(data.decode())
+ if response['status'] != 'success':
+ print(f"打洞请求失败: {response['message']}")
+ return False
+
+ # 向服务提供者发送打洞包
+ print(f"尝试打洞到 {self.provider_addr}...")
+ for _ in range(5):
+ self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr)
+ time.sleep(0.5)
+
+ # 检查连通性
+ self.udp_sock.settimeout(2.0)
+ try:
+ self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
+ data, addr = self.udp_sock.recvfrom(1024)
+ if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
+ print("打洞成功! 已建立UDP连接")
+ return True
+ else:
+ print(f"错误的打洞响应{data}")
+ return False
+ except socket.timeout:
+ print("打洞失败: 未收到响应")
+ return False
+ finally:
+ self.udp_sock.settimeout(None)
+
+ def udp_listener(self):
+ """监听UDP消息"""
+ while self.running:
+ try:
+ data, addr = self.udp_sock.recvfrom(4096)
+ message = json.loads(data.decode())
+
+ if message['action'] == 'punch_response':
+ # 打洞响应 - 确认连通性
+ print(f"收到来自 {addr} 的打洞响应")
+ elif message['action'] == 'data':
+ if message['conn_id'] in self.active_connections:
+ # 转发数据到本地客户端
+ self.active_connections[message['conn_id']].sendall(bytes.fromhex(message['data']))
+ else:
+ print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接")
+ except Exception as e:
+ print(e)
+
+ def tcp_listener(self):
+ """监听本地TCP连接"""
+ while self.running:
+ try:
+ client_sock, client_addr = self.tcp_sock.accept()
+ print(f"新的本地连接来自 {client_addr}")
+
+ # 为每个连接生成唯一ID
+ conn_id = str(uuid.uuid4())
+
+ # 存储连接
+ self.active_connections[conn_id] = client_sock
+
+ # 请求服务提供者建立连接
+ self.udp_sock.sendto(json.dumps({
+ 'action': 'connect',
+ 'conn_id': conn_id
+ }).encode(), self.provider_addr)
+
+ time.sleep(0.5)
+
+ # 启动数据转发线程
+ threading.Thread(
+ target=self.forward_data,
+ args=(conn_id, client_sock),
+ daemon=True
+ ).start()
+
+ except:
+ pass
+
+ def forward_data(self, conn_id, client_sock):
+ """转发本地TCP数据到UDP隧道"""
+ CHUNK_SIZE = 1472 # 根据 MTU 调整最大分片大小
+ try:
+ while True:
+ # 从本地客户端读取数据
+ data = client_sock.recv(4096)
+ if not data:
+ break
+
+ # # 通过UDP发送给服务提供者
+ # self.udp_sock.sendto(json.dumps({
+ # 'action': 'data',
+ # 'conn_id': conn_id,
+ # 'data': data.hex() # 十六进制编码二进制数据
+ # }).encode(), self.provider_addr)
+ # 分片发送数据
+ for i in range(0, len(data), CHUNK_SIZE):
+ chunk = data[i:i + CHUNK_SIZE]
+ self.udp_sock.sendto(json.dumps({
+ 'action': 'data',
+ 'conn_id': conn_id,
+ 'data': chunk.hex()
+ }).encode(), self.provider_addr)
+ except:
+ pass
+ finally:
+ client_sock.close()
+ if conn_id in self.active_connections:
+ del self.active_connections[conn_id]
+
+ def run(self):
+ """运行服务连接端"""
+ # 请求服务
+ if not self.request_service():
+ return
+
+ # 执行打洞
+ if not self.punch_hole():
+ return
+
+ # 启动UDP监听线程
+ threading.Thread(target=self.udp_listener, daemon=True).start()
+
+ # 启动TCP监听线程
+ threading.Thread(target=self.tcp_listener, daemon=True).start()
+
+ # 保持主线程运行
+ try:
+ while self.running:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ self.running = False
+ self.udp_sock.close()
+ self.tcp_sock.close()
+ print("服务连接端已停止")
+
+
+if __name__ == '__main__':
+ # 配置信息
+ COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
+ SERVICE_NAME = "my_game_server"
+ LOCAL_PORT = 12345 # 本地映射端口
+
+ connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
+ connector.run()
diff --git a/coordinator.py b/coordinator.py
new file mode 100644
index 0000000..9f2fa8c
--- /dev/null
+++ b/coordinator.py
@@ -0,0 +1,137 @@
+import socket
+import json
+import time
+from collections import defaultdict
+
+
+class CoordinatorServer:
+ def __init__(self, host='0.0.0.0', port=5000):
+ self.host = host
+ self.port = port
+ self.services = {} # 服务名 -> (公网地址, 内网端口)
+ self.clients = defaultdict(dict) # 客户端ID -> 信息
+ self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.udp_sock.bind((host, port))
+ self.running = True
+ print(f"协调服务器运行在 {host}:{port}")
+
+ def handle_register(self, data, addr):
+ """处理服务注册请求"""
+ try:
+ service_name = data['service_name']
+ internal_port = data['internal_port']
+ client_id = data['client_id']
+
+ # 记录客户端信息
+ self.clients[client_id] = {
+ 'addr': addr,
+ 'service_name': service_name,
+ 'internal_port': internal_port,
+ 'last_seen': time.time()
+ }
+
+ # 注册服务
+ self.services[service_name] = (addr, internal_port)
+ print(f"服务注册: {service_name} (端口:{internal_port}) 来自 {addr}")
+
+ return {'status': 'success', 'message': '服务注册成功'}
+ except Exception as e:
+ return {'status': 'error', 'message': str(e)}
+
+ def handle_request(self, data, addr):
+ """处理服务请求"""
+ try:
+ service_name = data['service_name']
+ client_id = data['client_id']
+
+ if service_name not in self.services:
+ return {'status': 'error', 'message': '服务未找到'}
+
+ # 记录请求客户端信息
+ self.clients[client_id] = {
+ 'addr': addr,
+ 'service_name': service_name,
+ 'last_seen': time.time()
+ }
+
+ # 获取服务提供者的信息
+ provider_addr, internal_port = self.services[service_name]
+ target_id = [
+ client_id
+ for client_id, info in self.clients.items()
+ if info['service_name'] == service_name
+ ][0]
+
+ print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
+
+ return {
+ 'status': 'success',
+ 'provider_addr': provider_addr,
+ 'internal_port': internal_port,
+ 'target_id': target_id
+ }
+ except Exception as e:
+ return {'status': 'error', 'message': str(e)}
+
+ def handle_punch_request(self, data, addr):
+ """处理打洞请求"""
+ try:
+ client_id = data['client_id']
+ target_id = data['target_id']
+
+ # 获取目标客户端信息
+ if target_id not in self.clients:
+ return {'status': 'error', 'message': '目标客户端未找到'}
+
+ target_addr = self.clients[target_id]['addr']
+
+ print(f"打洞请求: {addr} -> {target_addr}")
+
+ # 通知双方对方的地址
+ self.udp_sock.sendto(json.dumps({
+ 'action': 'punch_request',
+ 'client_id': client_id,
+ 'client_addr': addr
+ }).encode(), target_addr)
+
+ return {
+ 'status': 'success',
+ 'target_addr': target_addr
+ }
+ except Exception as e:
+ return {'status': 'error', 'message': str(e)}
+
+ def run(self):
+ """运行协调服务器"""
+ print("协调服务器已启动,等待连接...")
+ while self.running:
+ try:
+ data, addr = self.udp_sock.recvfrom(4096)
+ try:
+ message = json.loads(data.decode())
+ action = message.get('action')
+
+ if action == 'register':
+ response = self.handle_register(message, addr)
+ elif action == 'request':
+ response = self.handle_request(message, addr)
+ elif action == 'punch_request':
+ response = self.handle_punch_request(message, addr)
+ else:
+ response = {'status': 'error', 'message': '无效操作'}
+
+ self.udp_sock.sendto(json.dumps(response).encode(), addr)
+ except json.JSONDecodeError:
+ self.udp_sock.sendto(json.dumps({
+ 'status': 'error',
+ 'message': '无效的JSON数据'
+ }).encode(), addr)
+ except Exception as e:
+ print(f"服务器错误: {str(e)}")
+
+ self.udp_sock.close()
+
+
+if __name__ == '__main__':
+ server = CoordinatorServer()
+ server.run()
diff --git a/provider1.py b/provider1.py
new file mode 100644
index 0000000..9baa207
--- /dev/null
+++ b/provider1.py
@@ -0,0 +1,200 @@
+import socket
+import json
+import threading
+import time
+import uuid
+
+
+class ServiceProvider:
+ def __init__(self, coordinator_addr, service_name, internal_port):
+ self.coordinator_addr = coordinator_addr
+ self.service_name = service_name
+ self.internal_port = internal_port
+ self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
+
+ # 创建UDP套接字用于协调通信
+ self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.udp_sock.bind(('0.0.0.0', 0))
+ self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
+
+ # # 创建TCP套接字用于服务监听
+ # self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ # self.tcp_sock.bind(('0.0.0.0', internal_port))
+ # self.tcp_sock.listen(5)
+ print(f"服务端监听在端口 {internal_port}")
+
+ # 存储活动连接
+ self.active_connections = {}
+ self.running = True
+
+ def register_service(self):
+ """向协调服务器注册服务"""
+ message = {
+ 'action': 'register',
+ 'service_name': self.service_name,
+ 'internal_port': self.internal_port,
+ 'external_port': self.udp_sock.getsockname()[1],
+ 'client_id': self.client_id
+ }
+ print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'")
+ self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
+
+ # 等待响应
+ data, _ = self.udp_sock.recvfrom(4096)
+ response = json.loads(data.decode())
+ if response['status'] == 'success':
+ print(f"服务 '{self.service_name}' 注册成功")
+ else:
+ print(f"注册失败: {response['message']}")
+
+ def udp_listener(self):
+ """监听UDP消息"""
+ while self.running:
+ try:
+ data, addr = self.udp_sock.recvfrom(4096)
+ print(f"收到来自 {addr} 的消息: {data.decode()}")
+ message = json.loads(data.decode())
+
+ if message.get('action') == 'punch' or message.get('action') == 'punch_check':
+ # 打洞请求 - 发送响应以确认连通性
+ self.udp_sock.sendto(json.dumps(
+ {'action': 'punch_response',
+ 'client_id': message['client_id']
+ }).encode(), addr)
+ print(f"收到来自 {addr} 的打洞请求,已响应")
+ elif message.get('action') == 'punch_response':
+ # 打洞响应 - 确认打洞成功
+ print(f"收到来自 {addr} 的打洞响应")
+ elif message.get('action') == 'connect':
+ # 新的连接请求
+ conn_id = message['conn_id']
+ print(f"收到来自 {addr} 的连接请求")
+ threading.Thread(
+ target=self.handle_connection,
+ args=(conn_id, addr),
+ daemon=True
+ ).start()
+ elif message.get('action') == 'punch_request':
+ client_addr = message['client_addr']
+ print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞")
+ self.punch_hole(message)
+ elif message.get('action') == 'data':
+ # 接收到来自客户端的数据
+ conn_id = message['conn_id']
+ data = bytes.fromhex(message['data'])
+ if conn_id in self.active_connections:
+ # 转发数据到本地服务
+ print(f"收到来自 {addr} 的数据{data}\n,转发到本地服务")
+ self.active_connections[conn_id]['local_sock'].sendall(data)
+ else:
+ print(f"收到来自 {addr} 的数据,但未找到对应的连接")
+ else:
+ print(f"收到未知消息: {message}")
+ except Exception as e:
+ print(e)
+ pass
+
+ def handle_connection(self, conn_id, client_addr):
+ """处理来自客户端的连接"""
+ try:
+ # 接受本地服务连接
+ local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ local_sock.connect(('127.0.0.1', self.internal_port))
+ if not local_sock:
+ print("无法连接到本地服务")
+ return
+
+ # 创建与客户端的UDP隧道
+ print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}")
+
+ # 存储连接
+ self.active_connections[conn_id] = {
+ 'local_sock': local_sock,
+ 'client_addr': client_addr
+ }
+
+ # 通知客户端连接就绪
+ self.udp_sock.sendto(json.dumps({
+ 'action': 'connected',
+ 'client_id': conn_id
+ }).encode(), client_addr)
+
+ # 启动数据转发
+ self.forward_data(conn_id, local_sock, client_addr)
+ except Exception as e:
+ print(f"连接失败: {str(e)}")
+ self.udp_sock.sendto(json.dumps({
+ 'action': 'connect_failed',
+ 'client_id': conn_id,
+ 'message': str(e)
+ }).encode(), client_addr)
+
+ def forward_data(self, conn_id, local_sock, client_addr):
+ """转发TCP数据到UDP隧道"""
+ CHUNK_SIZE = 1472 # 根据 MTU 调整最大分片大小
+ try:
+ while True:
+ # 从本地服务读取数据
+ data = local_sock.recv(4096)
+ if not data:
+ break
+
+ # # 通过UDP发送给客户端
+ # self.udp_sock.sendto(json.dumps({
+ # 'action': 'data',
+ # 'conn_id': conn_id,
+ # 'data': data.hex() # 十六进制编码二进制数据
+ # }).encode(), client_addr)
+ for i in range(0, len(data), CHUNK_SIZE):
+ chunk = data[i:i + CHUNK_SIZE]
+ self.udp_sock.sendto(json.dumps({
+ 'action': 'data',
+ 'conn_id': conn_id,
+ 'data': chunk.hex()
+ }).encode(), client_addr)
+ except Exception as e:
+ print(f"转发数据失败: {str(e)}")
+ finally:
+ local_sock.close()
+ if conn_id in self.active_connections:
+ del self.active_connections[conn_id]
+ print(f"连接 {conn_id} 已关闭")
+
+ def run(self):
+ """运行服务提供端"""
+ # 注册服务
+ self.register_service()
+
+ # 启动UDP监听线程
+ threading.Thread(target=self.udp_listener, daemon=True).start()
+
+ # 保持主线程运行
+ try:
+ while self.running:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ self.running = False
+ self.udp_sock.close()
+ print("服务提供端已停止")
+
+ def punch_hole(self, message):
+ for i in range(5):
+ try:
+ self.udp_sock.sendto(json.dumps({
+ 'action': 'punch',
+ 'client_id': message['client_id']
+ }).encode(), tuple(message['client_addr']))
+ time.sleep(2)
+ except Exception as e:
+ print(f"打洞失败: {str(e)}")
+ time.sleep(1)
+
+
+if __name__ == '__main__':
+ # 配置信息
+ COORDINATOR_ADDR = ('127.0.0.1', 5000) # 替换为公网服务器IP
+ SERVICE_NAME = "my_game_server"
+ INTERNAL_PORT = 8888 # 内网游戏服务器端口
+
+ provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
+ provider.run()