提高稳定性
This commit is contained in:
parent
bb3be9c41b
commit
bcab343fe6
182
connector.py
182
connector.py
@ -1,182 +0,0 @@
|
||||
import socket
|
||||
import threading
|
||||
import json
|
||||
import hashlib
|
||||
import struct
|
||||
import time
|
||||
|
||||
|
||||
class Connector:
|
||||
def __init__(self, coordinator_host='127.0.0.1', coordinator_port=5000):
|
||||
self.coordinator_addr = (coordinator_host, coordinator_port)
|
||||
self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.coord_conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.coord_conn.connect(self.coordinator_addr)
|
||||
self.local_port = self.coord_conn.getsockname()[1]
|
||||
self.token = None
|
||||
self.connections = {}
|
||||
self.conn_counter = 0
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def connect_to_coordinator(self):
|
||||
# Login
|
||||
self._send_json({'action': 'login', 'account': 'admin'})
|
||||
response = self._recv_json()
|
||||
|
||||
if response.get('status') == 'salt':
|
||||
salt = response['salt']
|
||||
password_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest()
|
||||
self._send_json({'action': 'auth', 'hash': password_hash})
|
||||
response = self._recv_json()
|
||||
|
||||
if response.get('status') == 'success':
|
||||
self.token = response['token']
|
||||
print(f"Authenticated. Token: {self.token}")
|
||||
return True
|
||||
print("Connection to coordinator failed")
|
||||
return False
|
||||
|
||||
def request_service(self, service_name):
|
||||
self._send_json({
|
||||
'action': 'request_service',
|
||||
'service_name': service_name,
|
||||
'token': self.token
|
||||
})
|
||||
response = self._recv_json()
|
||||
|
||||
if response.get('status') == 'success':
|
||||
provider_addr = tuple(response['provider_addr'])
|
||||
print(f"Connecting to provider at {provider_addr}")
|
||||
|
||||
punch_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
punch_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
punch_socket.bind(('0.0.0.0', self.local_port))
|
||||
# 向对方发送打洞包
|
||||
for i in range(10):
|
||||
punch_socket.sendto(b'pong pong pong pong', provider_addr)
|
||||
time.sleep(0.2)
|
||||
punch_socket.close()
|
||||
|
||||
try:
|
||||
punch_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
punch_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
punch_socket.connect(provider_addr)
|
||||
except:
|
||||
print("tcp 打洞")
|
||||
|
||||
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
listener.bind(('0.0.0.0', self.local_port))
|
||||
listener.listen(5)
|
||||
print(f"Listening on port {self.local_port} for provider connections")
|
||||
|
||||
# Start handler thread to accept provider's connection
|
||||
threading.Thread(
|
||||
target=self.handle_provider_connection,
|
||||
args=(listener, service_name),
|
||||
daemon=True
|
||||
).start()
|
||||
return True
|
||||
print("Failed to request service")
|
||||
return False
|
||||
|
||||
def handle_provider_connection(self, listener, service_name):
|
||||
# Accept connection from provider
|
||||
try:
|
||||
provider_sock, addr = listener.accept()
|
||||
print(f"Accepted provider connection from {addr}")
|
||||
# Start heartbeat monitoring
|
||||
threading.Thread(
|
||||
target=self.monitor_heartbeats,
|
||||
args=(provider_sock,),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
# Start client listener to accept local clients
|
||||
client_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
client_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
client_listener.bind(('0.0.0.0', self.local_port))
|
||||
client_listener.listen(5)
|
||||
print(f"Client listener started on port {self.local_port}")
|
||||
|
||||
try:
|
||||
while True:
|
||||
client_sock, addr = client_listener.accept()
|
||||
print(f"New client from {addr}")
|
||||
with self.lock:
|
||||
conn_id = self.conn_counter
|
||||
self.conn_counter += 1
|
||||
|
||||
threading.Thread(
|
||||
target=self.handle_client_connection,
|
||||
args=(client_sock, provider_sock, conn_id),
|
||||
daemon=True
|
||||
).start()
|
||||
finally:
|
||||
client_listener.close()
|
||||
finally:
|
||||
listener.close()
|
||||
|
||||
def handle_client_connection(self, client_sock, provider_sock, conn_id):
|
||||
self.connections[conn_id] = client_sock
|
||||
|
||||
try:
|
||||
while True:
|
||||
data = client_sock.recv(4096)
|
||||
if not data:
|
||||
break
|
||||
header = struct.pack("!I B", conn_id, len(data))
|
||||
provider_sock.sendall(header + data)
|
||||
finally:
|
||||
client_sock.close()
|
||||
with self.lock:
|
||||
if conn_id in self.connections:
|
||||
del self.connections[conn_id]
|
||||
|
||||
def monitor_heartbeats(self, sock):
|
||||
last_heartbeat = time.time()
|
||||
while True:
|
||||
try:
|
||||
header = sock.recv(5)
|
||||
if not header:
|
||||
break
|
||||
|
||||
conn_id, data_len = struct.unpack("!I B", header)
|
||||
data = sock.recv(data_len) if data_len > 0 else b''
|
||||
|
||||
# Check if heartbeat
|
||||
if conn_id == 0 and data_len == 0:
|
||||
last_heartbeat = time.time()
|
||||
continue
|
||||
|
||||
# Forward data to client
|
||||
with self.lock:
|
||||
if conn_id in self.connections:
|
||||
self.connections[conn_id].sendall(data)
|
||||
except ConnectionResetError:
|
||||
break
|
||||
|
||||
# Check heartbeat timeout
|
||||
if time.time() - last_heartbeat > 10:
|
||||
print("Heartbeat timeout")
|
||||
break
|
||||
|
||||
def start(self, service_name='ssh'):
|
||||
if not self.connect_to_coordinator():
|
||||
return
|
||||
|
||||
if self.request_service(service_name):
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
def _send_json(self, data):
|
||||
self.coord_conn.sendall(json.dumps(data).encode())
|
||||
|
||||
def _recv_json(self):
|
||||
data = self.coord_conn.recv(4096)
|
||||
return json.loads(data.decode()) if data else None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
connector = Connector(coordinator_host='www.awin-x.top')
|
||||
connector.start(service_name='ssh')
|
||||
181
coordinator.py
181
coordinator.py
@ -1,181 +0,0 @@
|
||||
import socket
|
||||
import threading
|
||||
import json
|
||||
import os
|
||||
import hashlib
|
||||
import secrets
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
def send_json(conn, data):
|
||||
# 发送JSON数据
|
||||
conn.sendall(json.dumps(data).encode())
|
||||
|
||||
|
||||
def recv_json(conn):
|
||||
# 接收并解析JSON数据
|
||||
data = conn.recv(4096)
|
||||
if not data:
|
||||
return None
|
||||
return json.loads(data.decode())
|
||||
|
||||
|
||||
class Coordinator:
|
||||
def __init__(self, host='0.0.0.0', port=5000):
|
||||
# 初始化协调器服务端参数
|
||||
self.host = host # 监听地址
|
||||
self.port = port # 监听端口
|
||||
# 生成盐值用于密码加密
|
||||
self.salt = secrets.token_hex(8)
|
||||
# 存储管理员密码哈希值(盐+密码)
|
||||
self.stored_hash = hashlib.sha256((self.salt + "admin_password").encode()).hexdigest()
|
||||
# 存储用户令牌信息
|
||||
self.tokens = {}
|
||||
# 存储服务注册信息,格式:{token: {services: [], addr: (), conn: socket}}
|
||||
self.services = defaultdict(dict)
|
||||
# 活动连接池(当前未使用)
|
||||
self.active_connections = {}
|
||||
# 线程锁保证数据安全
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def handle_client(self, conn, addr):
|
||||
# 处理客户端连接
|
||||
print(f"New connection from {addr}")
|
||||
token = None
|
||||
salt = secrets.token_hex(8)
|
||||
stored_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest()
|
||||
|
||||
try:
|
||||
while True:
|
||||
# 接收客户端JSON数据
|
||||
data = recv_json(conn)
|
||||
if not data:
|
||||
break
|
||||
|
||||
action = data.get('action')
|
||||
|
||||
# 登录流程:发送盐值
|
||||
if action == 'login':
|
||||
if data.get('account') == 'admin':
|
||||
response = {'status': 'salt', 'salt': salt}
|
||||
send_json(conn, response)
|
||||
else:
|
||||
send_json(conn, {'status': 'error', 'message': 'Invalid account'})
|
||||
|
||||
# 认证流程:验证密码哈希
|
||||
elif action == 'auth':
|
||||
if data.get('hash') == stored_hash:
|
||||
# 生成访问令牌(有效期1小时)
|
||||
token = secrets.token_hex(8)
|
||||
with self.lock:
|
||||
self.tokens[token] = {
|
||||
'ip': addr[0],
|
||||
'expiry': time.time() + 3600 # 令牌过期时间
|
||||
}
|
||||
response = {'status': 'success', 'token': token, 'message': 'Login successful'}
|
||||
send_json(conn, response)
|
||||
else:
|
||||
send_json(conn, {'status': 'error', 'message': 'Authentication failed'})
|
||||
|
||||
# 服务注册流程
|
||||
elif action == 'register_service':
|
||||
connector_token = data.get('token')
|
||||
if self.validate_token(connector_token, addr[0]):
|
||||
services = data.get('services', [])
|
||||
with self.lock:
|
||||
self.services[connector_token] = {
|
||||
'services': services, # 支持的服务列表
|
||||
'addr': addr, # 客户端地址信息
|
||||
'conn': conn # 客户端连接套接字
|
||||
}
|
||||
send_json(conn, {'status': 'success', 'message': 'Services registered'})
|
||||
else:
|
||||
send_json(conn, {'status': 'error', 'message': 'Invalid token'})
|
||||
|
||||
# 服务请求流程
|
||||
elif action == 'request_service':
|
||||
connector_token = data.get('token')
|
||||
if not self.validate_token(connector_token, addr[0]):
|
||||
send_json(conn, {'status': 'error', 'message': 'Invalid token'})
|
||||
continue
|
||||
|
||||
service_name = data.get('service_name')
|
||||
provider_token = self.find_service_provider(service_name)
|
||||
|
||||
if provider_token:
|
||||
provider_info = self.services[provider_token]
|
||||
provider_addr = provider_info['addr']
|
||||
connector_addr = addr
|
||||
count = 0
|
||||
|
||||
# 通知服务提供方进行NAT打洞
|
||||
punch_msg = {
|
||||
'action': 'punch_request',
|
||||
'connector_addr': connector_addr,
|
||||
'service_name': service_name # 请求的服务名称
|
||||
}
|
||||
send_json(provider_info['conn'], punch_msg)
|
||||
|
||||
# 响应请求方
|
||||
send_json(conn, {
|
||||
'status': 'success',
|
||||
'provider_addr': provider_addr
|
||||
})
|
||||
|
||||
# 使用后立即销毁令牌
|
||||
with self.lock:
|
||||
if connector_token in self.tokens:
|
||||
del self.tokens[connector_token]
|
||||
else:
|
||||
send_json(conn, {'status': 'error', 'message': 'Service not available'})
|
||||
except (ConnectionResetError, json.JSONDecodeError):
|
||||
pass
|
||||
finally:
|
||||
conn.close()
|
||||
print(f"Connection closed: {addr}")
|
||||
# 清理资源
|
||||
if token:
|
||||
with self.lock:
|
||||
if token in self.tokens:
|
||||
del self.tokens[token]
|
||||
if token in self.services:
|
||||
del self.services[token]
|
||||
|
||||
def validate_token(self, token, ip):
|
||||
# 验证令牌有效性:存在、IP匹配、未过期
|
||||
with self.lock:
|
||||
token_info = self.tokens.get(token)
|
||||
if token_info and token_info['ip'] == ip and token_info['expiry'] > time.time():
|
||||
return True
|
||||
return False
|
||||
|
||||
def find_service_provider(self, service_name):
|
||||
# 查找可用服务提供者
|
||||
for token, info in self.services.items():
|
||||
if service_name in info['services']:
|
||||
return token
|
||||
return None
|
||||
|
||||
def start(self):
|
||||
# 启动协调器服务
|
||||
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
server.bind((self.host, self.port))
|
||||
server.listen(5)
|
||||
print(f"Coordinator listening on {self.host}:{self.port}")
|
||||
|
||||
while True:
|
||||
conn, addr = server.accept()
|
||||
# 为每个连接创建独立线程
|
||||
client_thread = threading.Thread(
|
||||
target=self.handle_client,
|
||||
args=(conn, addr),
|
||||
daemon=True
|
||||
)
|
||||
client_thread.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
coordinator = Coordinator()
|
||||
coordinator.start()
|
||||
182
provider.py
182
provider.py
@ -1,182 +0,0 @@
|
||||
import socket
|
||||
import threading
|
||||
import json
|
||||
import hashlib
|
||||
import time
|
||||
import struct
|
||||
|
||||
|
||||
# 定义 Provider 类,用于处理与协调器的连接和P2P通信
|
||||
class Provider:
|
||||
def __init__(self, coordinator_host='www.awin-x.top', coordinator_port=5000):
|
||||
# 初始化协调器的主机和端口
|
||||
self.coordinator_addr = (coordinator_host, coordinator_port)
|
||||
# 创建与协调器的TCP连接
|
||||
self.coord_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.coord_conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.coord_conn.connect(self.coordinator_addr)
|
||||
self.local_port = self.coord_conn.getsockname()[1]
|
||||
# 用于存储认证令牌
|
||||
self.token = None
|
||||
# 定义可提供的服务及其默认端口
|
||||
self.service_ports = {'ssh': 22, 'alist': 5244, 'minecraft': 25565}
|
||||
# 存储连接的客户端
|
||||
self.connections = {}
|
||||
# 用于线程安全操作的锁
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def connect_to_coordinator(self):
|
||||
# 发送登录请求
|
||||
self._send_json({'action': 'login', 'account': 'admin'})
|
||||
response = self._recv_json()
|
||||
|
||||
# 处理协调器返回的盐值并进行密码哈希验证
|
||||
if response.get('status') == 'salt':
|
||||
salt = response['salt']
|
||||
password_hash = hashlib.sha256((salt + "admin_password").encode()).hexdigest()
|
||||
self._send_json({'action': 'auth', 'hash': password_hash})
|
||||
response = self._recv_json()
|
||||
|
||||
# 如果认证成功,存储令牌并注册服务
|
||||
if response.get('status') == 'success':
|
||||
self.token = response['token']
|
||||
print(f"Authenticated. Token: {self.token}")
|
||||
|
||||
self._send_json({
|
||||
'action': 'register_service',
|
||||
'services': list(self.service_ports.keys()),
|
||||
'token': self.token
|
||||
})
|
||||
response = self._recv_json()
|
||||
if response.get('status') == 'success':
|
||||
print("Services registered")
|
||||
return True
|
||||
print("Connection to coordinator failed")
|
||||
return False
|
||||
|
||||
def handle_punch_request(self, data):
|
||||
connector_addr = tuple(data['connector_addr'])
|
||||
service_name = data['service_name']
|
||||
print(f"Punching hole to connector at {connector_addr}, waiting 10 seconds...")
|
||||
|
||||
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
udp_socket.bind(('0.0.0.0', self.local_port))
|
||||
# 向对方发送打洞包
|
||||
for i in range(10):
|
||||
udp_socket.sendto(b'punch punch punch punch', connector_addr)
|
||||
time.sleep(0.2)
|
||||
udp_socket.close()
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
punch_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
punch_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
punch_sock.settimeout(10)
|
||||
punch_sock.bind(('0.0.0.0', self.local_port))
|
||||
|
||||
try:
|
||||
punch_sock.connect(connector_addr)
|
||||
print("Successfully connected to connector after delay")
|
||||
threading.Thread(
|
||||
target=self.handle_connector_connection,
|
||||
args=(punch_sock, service_name),
|
||||
daemon=True
|
||||
).start()
|
||||
except socket.error as e:
|
||||
print(f"Punching failed: {e}")
|
||||
punch_sock.close()
|
||||
|
||||
def handle_connector_connection(self, sock, service_name):
|
||||
# 处理与客户端的连接,启动心跳机制
|
||||
threading.Thread(target=self.send_heartbeats, args=(sock,), daemon=True).start()
|
||||
|
||||
try:
|
||||
while True:
|
||||
# 接收连接头信息
|
||||
header = sock.recv(5)
|
||||
if not header:
|
||||
break
|
||||
|
||||
conn_id, data_len = struct.unpack("!I B", header)
|
||||
data = sock.recv(data_len) if data_len > 0 else b''
|
||||
|
||||
if not data:
|
||||
with self.lock:
|
||||
if conn_id in self.connections:
|
||||
self.connections[conn_id].close()
|
||||
del self.connections[conn_id]
|
||||
continue
|
||||
|
||||
with self.lock:
|
||||
if conn_id not in self.connections:
|
||||
service_port = self.service_ports.get(service_name, 22)
|
||||
service_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
service_sock.connect(('127.0.0.1', service_port))
|
||||
self.connections[conn_id] = service_sock
|
||||
threading.Thread(
|
||||
target=self.forward_data,
|
||||
args=(service_sock, sock, conn_id),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
self.connections[conn_id].sendall(data)
|
||||
except ConnectionResetError:
|
||||
pass
|
||||
finally:
|
||||
sock.close()
|
||||
with self.lock:
|
||||
for conn_id, service_sock in list(self.connections.items()):
|
||||
service_sock.close()
|
||||
self.connections.clear()
|
||||
|
||||
def forward_data(self, src, dst, conn_id):
|
||||
# 转发数据
|
||||
try:
|
||||
while True:
|
||||
data = src.recv(4096)
|
||||
if not data:
|
||||
break
|
||||
header = struct.pack("!I B", conn_id, len(data))
|
||||
dst.sendall(header + data)
|
||||
finally:
|
||||
src.close()
|
||||
with self.lock:
|
||||
if conn_id in self.connections:
|
||||
del self.connections[conn_id]
|
||||
|
||||
def send_heartbeats(self, sock):
|
||||
# 发送心跳包以保持连接
|
||||
while True:
|
||||
try:
|
||||
sock.sendall(b'\x00\x00\x00\x00\x00') # Empty heartbeat
|
||||
time.sleep(5)
|
||||
except:
|
||||
break
|
||||
|
||||
def start(self):
|
||||
# 启动提供者,连接到协调器并开始处理请求
|
||||
if not self.connect_to_coordinator():
|
||||
return
|
||||
|
||||
try:
|
||||
while True:
|
||||
data = self._recv_json()
|
||||
if data and data.get('action') == 'punch_request':
|
||||
self.handle_punch_request(data)
|
||||
except (ConnectionResetError, json.JSONDecodeError):
|
||||
print("Disconnected from coordinator")
|
||||
|
||||
def _send_json(self, data):
|
||||
# 发送JSON数据
|
||||
self.coord_conn.sendall(json.dumps(data).encode())
|
||||
|
||||
def _recv_json(self):
|
||||
# 接收JSON数据
|
||||
data = self.coord_conn.recv(4096)
|
||||
return json.loads(data.decode()) if data else None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
provider = Provider()
|
||||
provider.start()
|
||||
155
server.py
Normal file
155
server.py
Normal file
@ -0,0 +1,155 @@
|
||||
import json
|
||||
import socket
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
|
||||
services = {
|
||||
'ssh': 22,
|
||||
'alist': 5244,
|
||||
'minecraft': 25565
|
||||
}
|
||||
co_server = 'www.awin-x.top'
|
||||
co_port = 5000
|
||||
|
||||
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
tcp_sock.bind(('0.0.0.0', 0))
|
||||
tcp_sock.listen(5)
|
||||
listen_port = tcp_sock.getsockname()[1]
|
||||
print(f"监听端口: {listen_port}")
|
||||
|
||||
|
||||
def pack_data(action_, message_, data_):
|
||||
# 将字符串编码为UTF-8字节串
|
||||
action_bytes = action_.encode('utf-8')
|
||||
message_bytes = message_.encode('utf-8')
|
||||
# 使用大端序打包长度信息(4字节无符号整数)
|
||||
# >I: 大端序无符号整型(4字节)
|
||||
packed = struct.pack('>I', len(action_bytes))
|
||||
packed += action_bytes
|
||||
packed += struct.pack('>I', len(message_bytes))
|
||||
packed += message_bytes
|
||||
packed += data_ # 直接附加二进制数据
|
||||
return packed
|
||||
|
||||
|
||||
def unpack_data(packed):
|
||||
# 解析action长度(前4字节)
|
||||
action_len = struct.unpack('>I', packed[:4])[0]
|
||||
offset = 4
|
||||
# 提取action字节并解码
|
||||
action_bytes = packed[offset:offset + action_len]
|
||||
action_ = action_bytes.decode('utf-8')
|
||||
offset += action_len
|
||||
# 解析message长度(接下来的4字节)
|
||||
message_len = struct.unpack('>I', packed[offset:offset + 4])[0]
|
||||
offset += 4
|
||||
# 提取message字节并解码
|
||||
message_bytes = packed[offset:offset + message_len]
|
||||
message_ = message_bytes.decode('utf-8')
|
||||
offset += message_len
|
||||
# 剩余部分是原始二进制数据
|
||||
data_ = packed[offset:]
|
||||
return action_, message_, data_
|
||||
|
||||
|
||||
def send_data(conn, action_, message_, data_):
|
||||
packed = pack_data(action_, message_, data_)
|
||||
conn.sendall(packed)
|
||||
|
||||
|
||||
def recv_data(conn):
|
||||
data_ = conn.recv(4096)
|
||||
action_, message_, data_ = unpack_data(data_)
|
||||
# print(f"收到来自 {conn.getpeername()} 的数据: {data.decode()}")
|
||||
return action_, message_, data_
|
||||
|
||||
|
||||
def handle_hello(addr, message, data):
|
||||
pass
|
||||
|
||||
|
||||
def handle_bye(addr, message, data):
|
||||
pass
|
||||
|
||||
|
||||
def handle_punch_to(addr, message, data):
|
||||
pass
|
||||
|
||||
|
||||
actions = {
|
||||
'hello': handle_hello,
|
||||
'bye': lambda: print("收到来自 {client_addr} 的断开连接请求"),
|
||||
'punch_to': lambda: print("收到来自 {client_addr} 的打洞请求"),
|
||||
}
|
||||
|
||||
|
||||
def heart_beat_thread():
|
||||
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
udp_sock.bind(('0.0.0.0', listen_port))
|
||||
heart_beat_pack = pack_data('heart_beat', 'json_data', json.dumps({
|
||||
'services': services,
|
||||
}).encode())
|
||||
while True:
|
||||
udp_sock.sendto(heart_beat_pack, (co_server, co_port))
|
||||
time.sleep(10)
|
||||
|
||||
|
||||
def co_server_thread(co_server_sock, co_server_addr):
|
||||
while True:
|
||||
try:
|
||||
action, message, data = recv_data(co_server_sock)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
send_data(co_server_addr, 'error', f"无法解析数据", b'')
|
||||
break
|
||||
if action in actions:
|
||||
actions[action](co_server_addr, message, data)
|
||||
else:
|
||||
send_data(co_server_addr, 'error', f"未知操作: {action}", b'')
|
||||
co_server_sock.close()
|
||||
|
||||
|
||||
def client_thread(client_sock, client_addr):
|
||||
while True:
|
||||
try:
|
||||
action, message, data = recv_data(client_sock)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
send_data(client_sock, 'error', f"无法解析数据", b'')
|
||||
break
|
||||
if action in actions:
|
||||
actions[action](client_sock, client_addr, message, data)
|
||||
else:
|
||||
send_data(client_sock, 'error', f"未知操作: {action}", b'')
|
||||
client_sock.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 心跳线程
|
||||
threading.Thread(target=heart_beat_thread, daemon=True).start()
|
||||
|
||||
# 等待协服务器
|
||||
while True:
|
||||
co_server_sock, co_server_addr = tcp_sock.accept()
|
||||
action, message, data = recv_data(co_server_sock)
|
||||
if action == 'hello':
|
||||
if handle_hello(co_server_addr, message, data) == 'co_server':
|
||||
print(f'协服务器{co_server_addr}连接成功')
|
||||
threading.Thread(target=co_server_thread, daemon=True).start()
|
||||
break
|
||||
co_server_sock.close()
|
||||
|
||||
# 等待客户端
|
||||
while True:
|
||||
client_sock, client_addr = tcp_sock.accept()
|
||||
action, message, data = recv_data(client_sock)
|
||||
print(f"收到来自 {client_addr} 的连接")
|
||||
if action == 'hello':
|
||||
if handle_hello(co_server_addr, message, data) == 'client':
|
||||
print(f"确认客户端 {client_addr}")
|
||||
threading.Thread(target=client_thread, daemon=True).start()
|
||||
break
|
||||
client_sock.close()
|
||||
Loading…
Reference in New Issue
Block a user