Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f7211bb0b5 | |||
| bcab343fe6 | |||
| bb3be9c41b | |||
| 1797c5c813 | |||
| 9c4a6af1bd | |||
| 67d9fdc492 |
125
client.py
Normal file
125
client.py
Normal file
@ -0,0 +1,125 @@
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
from utils import *
|
||||
|
||||
client_token = f"client_{uuid.uuid4().hex[:8]}"
|
||||
my_charactor = "client"
|
||||
client_name = 'awin_client'
|
||||
|
||||
server_name = 'awin_server'
|
||||
service_name = 'alist'
|
||||
|
||||
co_server = 'www.awin-x.top'
|
||||
co_port = 5000
|
||||
|
||||
co_server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
co_server_sock.connect((co_server, co_port))
|
||||
|
||||
listen_port = 12345
|
||||
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
listen_sock.bind(('0.0.0.0', listen_port))
|
||||
listen_sock.listen(5)
|
||||
|
||||
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
udp_sock.bind(('0.0.0.0', 0))
|
||||
tcp_port = udp_sock.getsockname()[1]
|
||||
|
||||
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
conns = {}
|
||||
|
||||
|
||||
def client_thread(conn_id):
|
||||
client_sock = conns[conn_id]['client_sock']
|
||||
while True:
|
||||
data = client_sock.recv(4096)
|
||||
send_data_tcp(server_sock, data, {'conn_id': conn_id})
|
||||
|
||||
|
||||
def tcp_accept_thread():
|
||||
while True:
|
||||
client_sock, client_addr = listen_sock.accept()
|
||||
conn_id = f"conn_{uuid.uuid4().hex[:8]}"
|
||||
conns[conn_id] = {
|
||||
'client_sock': client_sock,
|
||||
'client_addr': client_addr,
|
||||
'ready': False
|
||||
}
|
||||
send_action_tcp(server_sock, 'connect_service', {
|
||||
'service': service_name,
|
||||
'conn_id': conn_id,
|
||||
'client_addr': client_addr,
|
||||
'client_name': client_name
|
||||
})
|
||||
count = 0
|
||||
while not conns[conn_id]['ready']:
|
||||
if count > 50:
|
||||
send_action_tcp(server_sock, 'disconnect_service', {
|
||||
'conn_id': conn_id
|
||||
})
|
||||
break
|
||||
time.sleep(0.1)
|
||||
threading.Thread(target=client_thread, args=conn_id).start()
|
||||
|
||||
|
||||
def handle_data(conn_id, data):
|
||||
conns[conn_id]['client_sock'].sendall(data)
|
||||
|
||||
|
||||
def server_thread():
|
||||
global client_name, client_token
|
||||
send_action_tcp(server_sock, 'client_hello', {
|
||||
'client_name': client_name,
|
||||
'client_token': client_token
|
||||
})
|
||||
while True:
|
||||
action, message, data = recv_tcp(server_sock)
|
||||
if action == 'error':
|
||||
print(message['message'])
|
||||
elif action == 'data':
|
||||
handle_data(message['conn_id'], data)
|
||||
elif action == 'bye':
|
||||
break
|
||||
server_sock.close()
|
||||
print("bye, from server")
|
||||
exit()
|
||||
|
||||
|
||||
def main():
|
||||
send_action_tcp(co_server_sock, 'client_hello', {
|
||||
'name': client_name,
|
||||
'token': client_token,
|
||||
'charactor': my_charactor,
|
||||
})
|
||||
send_action_tcp(co_server_sock, 'punch_request', {
|
||||
'server_name': 'awin_server'
|
||||
})
|
||||
action, message, data = recv_tcp(co_server_sock)
|
||||
if action == 'punch_request':
|
||||
co_server_punch_port = message['co_server_punch_port']
|
||||
else:
|
||||
print('punch request failed')
|
||||
return
|
||||
print(f'暴露端口:{tcp_port}->{(co_server, co_server_punch_port)}')
|
||||
packed = pack_data(action='client_punch_port')
|
||||
for i in range(5):
|
||||
udp_sock.sendto(packed, (co_server, co_server_punch_port))
|
||||
time.sleep(0.1)
|
||||
action, message, data = recv_tcp(co_server_sock)
|
||||
if action == 'punch_to':
|
||||
target_addr = tuple(message['target_addr'])
|
||||
print(f'打洞到{target_addr}')
|
||||
udp_sock.close()
|
||||
server_sock.bind(('0.0.0.0', tcp_port))
|
||||
server_sock.connect(target_addr)
|
||||
threading.Thread(target=server_thread, daemon=True).start()
|
||||
else:
|
||||
print(action)
|
||||
print('punch to failed')
|
||||
return
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
151
co_server.py
Normal file
151
co_server.py
Normal file
@ -0,0 +1,151 @@
|
||||
import socket
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
from utils import *
|
||||
|
||||
co_server_port = 5000
|
||||
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
tcp_sock.bind(('0.0.0.0', co_server_port))
|
||||
tcp_sock.listen(5)
|
||||
|
||||
servers = {}
|
||||
|
||||
clients = {}
|
||||
|
||||
co_server_token = f"co_server_{uuid.uuid4().hex[:8]}"
|
||||
my_charactor = 'co_server'
|
||||
|
||||
|
||||
def handle_punch_request(client_name, server_name):
|
||||
client = clients[client_name]
|
||||
server = servers[server_name]
|
||||
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
udp_sock.bind(('0.0.0.0', 5002))
|
||||
udp_sock.settimeout(20)
|
||||
co_server_punch_port = udp_sock.getsockname()[1]
|
||||
print(f'向客户端和服务端发起端口暴露请求到{co_server_punch_port}')
|
||||
send_action_tcp(server['sock'], 'punch_request', {
|
||||
'co_server_punch_port': co_server_punch_port
|
||||
})
|
||||
send_action_tcp(client['sock'], 'punch_request', {
|
||||
'co_server_punch_port': co_server_punch_port
|
||||
})
|
||||
client_punch_port = 0
|
||||
server_punch_port = 0
|
||||
count = 0
|
||||
while client_punch_port == 0 or server_punch_port == 0:
|
||||
action, message, data, addr = recv_udp(udp_sock)
|
||||
if action == 'server_punch_port':
|
||||
print(f'获取到服务端打洞端口{addr[1]}')
|
||||
server_punch_port = addr[1]
|
||||
if action == 'client_punch_port':
|
||||
print(f'获取到客户端打洞端口{addr[1]}')
|
||||
client_punch_port = addr[1]
|
||||
time.sleep(0.1)
|
||||
count += 1
|
||||
udp_sock.close()
|
||||
print('发送打洞请求')
|
||||
send_action_tcp(client['sock'], 'punch_to', {
|
||||
'server_name': server_name,
|
||||
'server_token': server['token'],
|
||||
'target_addr': (server['addr'][0], server_punch_port)
|
||||
})
|
||||
send_action_tcp(server['sock'], 'punch_to', {
|
||||
'client_name': client_name,
|
||||
'client_token': client['token'],
|
||||
'target_addr': (client['addr'][0], client_punch_port)
|
||||
})
|
||||
|
||||
|
||||
def handle_server(sock, addr, server_info):
|
||||
server_name = server_info['name']
|
||||
server_token = server_info['token']
|
||||
if server_name in servers:
|
||||
send_action_tcp(sock, 'bye', 'server already exists')
|
||||
return
|
||||
send_action_tcp(sock, 'co_server_hello', {
|
||||
'charactor': my_charactor,
|
||||
'token': co_server_token,
|
||||
})
|
||||
servers[server_name] = {
|
||||
'sock': sock,
|
||||
'addr': addr,
|
||||
'name': server_name,
|
||||
'token': server_token,
|
||||
}
|
||||
while True:
|
||||
try:
|
||||
action, message, data = recv_tcp(sock)
|
||||
if action == 'ping':
|
||||
send_action_tcp(sock, 'pong')
|
||||
else:
|
||||
send_action_tcp(sock, 'bye', 'unknown action')
|
||||
break
|
||||
except Exception as e:
|
||||
print(e)
|
||||
send_action_tcp(sock, 'bye', f'error: {e}')
|
||||
break
|
||||
sock.close()
|
||||
del servers[server_name]
|
||||
|
||||
|
||||
def handle_client(sock, addr, client_info):
|
||||
client_name = client_info['name']
|
||||
client_token = client_info['token']
|
||||
clients[client_name] = {
|
||||
'sock': sock,
|
||||
'addr': addr,
|
||||
'token': client_token,
|
||||
}
|
||||
while True:
|
||||
try:
|
||||
action, message, data = recv_tcp(sock)
|
||||
if action == 'ping':
|
||||
send_action_tcp(sock, 'pong')
|
||||
elif action == 'punch_request':
|
||||
server_name = message['server_name']
|
||||
if server_name in servers:
|
||||
handle_punch_request(client_name, server_name)
|
||||
else:
|
||||
send_action_tcp(sock, 'error', f"服务器不存在: {server_name}")
|
||||
break
|
||||
except Exception as e:
|
||||
print(e)
|
||||
send_action_tcp(sock, 'bye', f'error: {e}')
|
||||
break
|
||||
sock.close()
|
||||
del clients[client_name]
|
||||
|
||||
|
||||
def handle_connect(sock, addr):
|
||||
try:
|
||||
action, message, data = recv_tcp(sock)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
send_action_tcp(sock, 'error', f"无法解析数据")
|
||||
return
|
||||
if action == 'server_hello':
|
||||
handle_server(sock, addr, message)
|
||||
elif action == 'client_hello':
|
||||
handle_client(sock, addr, message)
|
||||
else:
|
||||
print(f"未知连接:{addr} -> {action}")
|
||||
sock.close()
|
||||
|
||||
|
||||
def main():
|
||||
print(f"{my_charactor} 启动")
|
||||
while True:
|
||||
conn, addr = tcp_sock.accept()
|
||||
print(f"{my_charactor} 收到来自 {addr} 的连接")
|
||||
threading.Thread(
|
||||
target=handle_connect,
|
||||
args=(conn, addr),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
232
connector1.py
232
connector1.py
@ -1,232 +0,0 @@
|
||||
import socket
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
|
||||
class ServiceConnector:
|
||||
def __init__(self, coordinator_addr, service_name, local_port):
|
||||
self.target_id = None
|
||||
self.coordinator_addr = coordinator_addr
|
||||
self.service_name = service_name
|
||||
self.local_port = local_port
|
||||
self.client_id = f"connector-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# 创建UDP套接字用于协调通信
|
||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.udp_sock.bind(('0.0.0.0', 0))
|
||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
||||
|
||||
# 创建TCP套接字用于本地监听
|
||||
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.tcp_sock.bind(('127.0.0.1', local_port))
|
||||
self.tcp_sock.listen(5)
|
||||
print(f"本地端口映射: 127.0.0.1:{local_port} -> 远程服务 '{service_name}'")
|
||||
|
||||
# 存储活动连接
|
||||
self.active_connections = {}
|
||||
self.provider_addr = None
|
||||
self.internal_port = None
|
||||
self.running = True
|
||||
|
||||
def request_service(self):
|
||||
"""向协调服务器请求服务"""
|
||||
message = {
|
||||
'action': 'request',
|
||||
'service_name': self.service_name,
|
||||
'client_id': self.client_id
|
||||
}
|
||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
||||
|
||||
# 等待响应
|
||||
data, _ = self.udp_sock.recvfrom(4096)
|
||||
response = json.loads(data.decode())
|
||||
if response['status'] == 'success':
|
||||
self.provider_addr = tuple(response['provider_addr'])
|
||||
self.internal_port = response['internal_port']
|
||||
self.target_id = response['target_id']
|
||||
print(f"找到服务提供者: {self.provider_addr}, 端口: {self.internal_port}")
|
||||
return True
|
||||
else:
|
||||
print(f"服务请求失败: {response['message']}")
|
||||
return False
|
||||
|
||||
def punch_hole(self):
|
||||
"""执行UDP打洞"""
|
||||
if not self.provider_addr:
|
||||
return False
|
||||
|
||||
# 请求打洞
|
||||
message = {
|
||||
'action': 'punch_request',
|
||||
'client_id': self.client_id,
|
||||
'target_id': self.target_id
|
||||
}
|
||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
||||
|
||||
# 等待协调服务器响应
|
||||
data, _ = self.udp_sock.recvfrom(4096)
|
||||
response = json.loads(data.decode())
|
||||
if response['status'] != 'success':
|
||||
print(f"打洞请求失败: {response['message']}")
|
||||
return False
|
||||
|
||||
# 向服务提供者发送打洞包
|
||||
print(f"尝试打洞到 {self.provider_addr}...")
|
||||
for _ in range(5):
|
||||
self.udp_sock.sendto(json.dumps({'action': 'punch', 'client_id': self.client_id}).encode(), self.provider_addr)
|
||||
time.sleep(0.5)
|
||||
|
||||
# 检查连通性
|
||||
self.udp_sock.settimeout(10.0)
|
||||
try:
|
||||
self.udp_sock.sendto(json.dumps({'action': 'punch_check'}).encode(), self.provider_addr)
|
||||
data, addr = self.udp_sock.recvfrom(1024)
|
||||
if json.loads(data.decode())['client_id'] == self.client_id and addr == self.provider_addr:
|
||||
print("打洞成功! 已建立UDP连接")
|
||||
return True
|
||||
else:
|
||||
print(f"错误的打洞响应{data}")
|
||||
return False
|
||||
except socket.timeout:
|
||||
print("打洞失败: 未收到响应")
|
||||
return False
|
||||
finally:
|
||||
self.udp_sock.settimeout(None)
|
||||
|
||||
def udp_listener(self):
|
||||
"""监听UDP消息"""
|
||||
while self.running:
|
||||
try:
|
||||
data, addr = self.udp_sock.recvfrom(65535)
|
||||
message = json.loads(data.decode())
|
||||
|
||||
if message['action'] == 'punch_response':
|
||||
# 打洞响应 - 确认连通性
|
||||
print(f"收到来自 {addr} 的打洞响应")
|
||||
elif message['action'] == 'data':
|
||||
if message['conn_id'] in self.active_connections:
|
||||
# 转发数据到本地客户端
|
||||
print(f"收到来自 {addr} 的数据, 转发到本地连接 {message['conn_id']}\n{message['data']}")
|
||||
self.active_connections[message['conn_id']].send(bytes.fromhex(message['data']))
|
||||
else:
|
||||
print(f"收到来自 {addr} 的数据, 但找不到对应的本地连接")
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_conn',
|
||||
'conn_id': message['conn_id']
|
||||
}).encode(), addr)
|
||||
elif message['action'] == 'stop_conn':
|
||||
# 停止连接
|
||||
if message['conn_id'] in self.active_connections:
|
||||
self.active_connections[message['conn_id']].close()
|
||||
del self.active_connections[message['conn_id']]
|
||||
print(f"已关闭本地连接 {message['conn_id']}")
|
||||
else:
|
||||
print(f"收到未知消息: {message}")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def tcp_listener(self):
|
||||
"""监听本地TCP连接"""
|
||||
while self.running:
|
||||
try:
|
||||
client_sock, client_addr = self.tcp_sock.accept()
|
||||
print(f"新的本地连接来自 {client_addr}")
|
||||
|
||||
# 为每个连接生成唯一ID
|
||||
conn_id = str(uuid.uuid4())
|
||||
|
||||
# 存储连接
|
||||
self.active_connections[conn_id] = client_sock
|
||||
|
||||
# 请求服务提供者建立连接
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'connect',
|
||||
'client_id': self.client_id,
|
||||
'conn_id': conn_id
|
||||
}).encode(), self.provider_addr)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# 启动数据转发线程
|
||||
threading.Thread(
|
||||
target=self.forward_data,
|
||||
args=(conn_id, client_sock),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
def forward_data(self, conn_id, client_sock):
|
||||
"""转发本地TCP数据到UDP隧道"""
|
||||
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
|
||||
try:
|
||||
while True:
|
||||
# 从本地客户端读取数据
|
||||
data = client_sock.recv(65535)
|
||||
if not data:
|
||||
break
|
||||
|
||||
# 通过UDP发送给服务提供者
|
||||
print(f"发送数据到服务提供者: {self.provider_addr}")
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'data',
|
||||
'conn_id': conn_id,
|
||||
'data': data.hex() # 十六进制编码二进制数据
|
||||
}).encode(), self.provider_addr)
|
||||
# 分片发送数据
|
||||
# for i in range(0, len(data), CHUNK_SIZE):
|
||||
# chunk = data[i:i + CHUNK_SIZE]
|
||||
# self.udp_sock.sendto(json.dumps({
|
||||
# 'action': 'data',
|
||||
# 'conn_id': conn_id,
|
||||
# 'data': chunk.hex()
|
||||
# }).encode(), self.provider_addr)
|
||||
except:
|
||||
pass
|
||||
finally:
|
||||
client_sock.close()
|
||||
if conn_id in self.active_connections:
|
||||
del self.active_connections[conn_id]
|
||||
|
||||
def run(self):
|
||||
"""运行服务连接端"""
|
||||
# 请求服务
|
||||
if not self.request_service():
|
||||
return
|
||||
|
||||
# 执行打洞
|
||||
if not self.punch_hole():
|
||||
return
|
||||
|
||||
# 启动UDP监听线程
|
||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
||||
|
||||
# 启动TCP监听线程
|
||||
threading.Thread(target=self.tcp_listener, daemon=True).start()
|
||||
|
||||
# 保持主线程运行
|
||||
try:
|
||||
while self.running:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
self.running = False
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_client',
|
||||
'client_id': self.client_id
|
||||
}).encode(), self.provider_addr)
|
||||
self.udp_sock.close()
|
||||
self.tcp_sock.close()
|
||||
print("服务连接端已停止")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 配置信息
|
||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
||||
SERVICE_NAME = "my_game_server"
|
||||
LOCAL_PORT = 12345 # 本地映射端口
|
||||
|
||||
connector = ServiceConnector(COORDINATOR_ADDR, SERVICE_NAME, LOCAL_PORT)
|
||||
connector.run()
|
||||
140
coordinator.py
140
coordinator.py
@ -1,140 +0,0 @@
|
||||
import socket
|
||||
import json
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
class CoordinatorServer:
|
||||
def __init__(self, host='0.0.0.0', port=5000):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.services = {} # 服务名 -> (公网地址, 内网端口)
|
||||
self.clients = defaultdict(dict) # 客户端ID -> 信息
|
||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.udp_sock.bind((host, port))
|
||||
self.running = True
|
||||
print(f"协调服务器运行在 {host}:{port}")
|
||||
|
||||
def handle_register(self, data, addr):
|
||||
"""处理服务注册请求"""
|
||||
try:
|
||||
service_name = data['service_name']
|
||||
internal_port = data['internal_port']
|
||||
client_id = data['client_id']
|
||||
|
||||
# 记录客户端信息
|
||||
self.clients[client_id] = {
|
||||
'addr': addr,
|
||||
'service_name': service_name,
|
||||
'internal_port': internal_port,
|
||||
'last_seen': time.time()
|
||||
}
|
||||
|
||||
# 注册服务
|
||||
self.services[service_name] = (addr, internal_port)
|
||||
print(f"服务注册: {service_name} (端口:{internal_port}) 来自 {addr}")
|
||||
|
||||
return {'status': 'success', 'message': '服务注册成功'}
|
||||
except Exception as e:
|
||||
return {'status': 'error', 'message': str(e)}
|
||||
|
||||
def handle_request(self, data, addr):
|
||||
"""处理服务请求"""
|
||||
try:
|
||||
service_name = data['service_name']
|
||||
client_id = data['client_id']
|
||||
|
||||
if service_name not in self.services:
|
||||
return {'status': 'error', 'message': '服务未找到'}
|
||||
|
||||
# 记录请求客户端信息
|
||||
self.clients[client_id] = {
|
||||
'addr': addr,
|
||||
'service_name': service_name,
|
||||
'last_seen': time.time()
|
||||
}
|
||||
|
||||
# 获取服务提供者的信息
|
||||
provider_addr, internal_port = self.services[service_name]
|
||||
target_id = [
|
||||
client_id
|
||||
for client_id, info in self.clients.items()
|
||||
if info['service_name'] == service_name
|
||||
][0]
|
||||
|
||||
print(f"服务请求: {service_name} 来自 {addr}, 提供者 {provider_addr}")
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'provider_addr': provider_addr,
|
||||
'internal_port': internal_port,
|
||||
'target_id': target_id
|
||||
}
|
||||
except Exception as e:
|
||||
return {'status': 'error', 'message': str(e)}
|
||||
|
||||
def handle_punch_request(self, data, addr):
|
||||
"""处理打洞请求"""
|
||||
try:
|
||||
client_id = data['client_id']
|
||||
target_id = data['target_id']
|
||||
|
||||
# 获取目标客户端信息
|
||||
if target_id not in self.clients:
|
||||
return {'status': 'error', 'message': '目标客户端未找到'}
|
||||
|
||||
target_addr = self.clients[target_id]['addr']
|
||||
|
||||
print(f"打洞请求: {addr} -> {target_addr}")
|
||||
|
||||
# 通知双方对方的地址
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'punch_request',
|
||||
'client_id': client_id,
|
||||
'client_addr': addr
|
||||
}).encode(), target_addr)
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'target_addr': target_addr
|
||||
}
|
||||
except Exception as e:
|
||||
return {'status': 'error', 'message': str(e)}
|
||||
|
||||
def run(self):
|
||||
"""运行协调服务器"""
|
||||
print("协调服务器已启动,等待连接...")
|
||||
while self.running:
|
||||
try:
|
||||
data, addr = self.udp_sock.recvfrom(4096)
|
||||
try:
|
||||
message = json.loads(data.decode())
|
||||
action = message.get('action')
|
||||
|
||||
if action == 'register':
|
||||
response = self.handle_register(message, addr)
|
||||
elif action == 'request':
|
||||
response = self.handle_request(message, addr)
|
||||
elif action == 'punch_request':
|
||||
response = self.handle_punch_request(message, addr)
|
||||
elif action == 'stop_provider':
|
||||
self.services.pop(message['service_name'], None)
|
||||
response = {'status': 'success', 'message': '服务停止成功'}
|
||||
else:
|
||||
response = {'status': 'error', 'message': '无效操作'}
|
||||
|
||||
self.udp_sock.sendto(json.dumps(response).encode(), addr)
|
||||
except json.JSONDecodeError:
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'status': 'error',
|
||||
'message': '无效的JSON数据'
|
||||
}).encode(), addr)
|
||||
except Exception as e:
|
||||
print(f"服务器错误: {str(e)}")
|
||||
|
||||
self.udp_sock.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
server = CoordinatorServer()
|
||||
server.run()
|
||||
220
provider1.py
220
provider1.py
@ -1,220 +0,0 @@
|
||||
import socket
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
|
||||
class ServiceProvider:
|
||||
def __init__(self, coordinator_addr, service_name, internal_port):
|
||||
self.coordinator_addr = coordinator_addr
|
||||
self.service_name = service_name
|
||||
self.internal_port = internal_port
|
||||
self.client_id = f"provider-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# 创建UDP套接字用于协调通信
|
||||
self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.udp_sock.bind(('0.0.0.0', 0))
|
||||
self.udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
|
||||
|
||||
# # 创建TCP套接字用于服务监听
|
||||
# self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# self.tcp_sock.bind(('0.0.0.0', internal_port))
|
||||
# self.tcp_sock.listen(5)
|
||||
print(f"服务端监听在端口 {internal_port}")
|
||||
|
||||
# 存储活动连接
|
||||
self.active_connections = {}
|
||||
self.running = True
|
||||
|
||||
def register_service(self):
|
||||
"""向协调服务器注册服务"""
|
||||
message = {
|
||||
'action': 'register',
|
||||
'service_name': self.service_name,
|
||||
'internal_port': self.internal_port,
|
||||
'external_port': self.udp_sock.getsockname()[1],
|
||||
'client_id': self.client_id
|
||||
}
|
||||
print(f"向协调服务器 {self.coordinator_addr} 注册服务 '{self.service_name}'")
|
||||
self.udp_sock.sendto(json.dumps(message).encode(), self.coordinator_addr)
|
||||
|
||||
# 等待响应
|
||||
data, _ = self.udp_sock.recvfrom(4096)
|
||||
response = json.loads(data.decode())
|
||||
if response['status'] == 'success':
|
||||
print(f"服务 '{self.service_name}' 注册成功")
|
||||
else:
|
||||
print(f"注册失败: {response['message']}")
|
||||
|
||||
def udp_listener(self):
|
||||
"""监听UDP消息"""
|
||||
while self.running:
|
||||
try:
|
||||
data, addr = self.udp_sock.recvfrom(4096)
|
||||
print(f"收到来自 {addr} 的消息: {data.decode()}")
|
||||
message = json.loads(data.decode())
|
||||
|
||||
if message.get('action') == 'punch' or message.get('action') == 'punch_check':
|
||||
# 打洞请求 - 发送响应以确认连通性
|
||||
self.udp_sock.sendto(json.dumps(
|
||||
{'action': 'punch_response',
|
||||
'client_id': message['client_id']
|
||||
}).encode(), addr)
|
||||
print(f"收到来自 {addr} 的打洞请求,已响应")
|
||||
elif message.get('action') == 'punch_response':
|
||||
# 打洞响应 - 确认打洞成功
|
||||
print(f"收到来自 {addr} 的打洞响应")
|
||||
elif message.get('action') == 'connect':
|
||||
# 新的连接请求
|
||||
conn_id = message['conn_id']
|
||||
client_id = message['client_id']
|
||||
print(f"收到来自 {addr} 的连接请求")
|
||||
threading.Thread(
|
||||
target=self.handle_connection,
|
||||
args=(conn_id, addr, client_id),
|
||||
daemon=True
|
||||
).start()
|
||||
elif message.get('action') == 'punch_request':
|
||||
client_addr = message['client_addr']
|
||||
print(f"从协服务器收到来自 {client_addr} 的打洞请求,开始打洞")
|
||||
self.punch_hole(message)
|
||||
elif message.get('action') == 'data':
|
||||
# 接收到来自客户端的数据
|
||||
conn_id = message['conn_id']
|
||||
data = bytes.fromhex(message['data'])
|
||||
if conn_id in self.active_connections:
|
||||
# 转发数据到本地服务
|
||||
print(f"收到来自 {addr} 的数据,转发到本地服务")
|
||||
self.active_connections[conn_id]['local_sock'].sendall(data)
|
||||
else:
|
||||
print(f"收到来自 {addr} 的数据,但未找到对应的连接")
|
||||
elif message.get('action') == 'stop_conn':
|
||||
conn_id = message['conn_id']
|
||||
if conn_id in self.active_connections:
|
||||
self.active_connections[conn_id]['local_sock'].close()
|
||||
self.active_connections.pop(conn_id)
|
||||
elif message.get('action') == 'stop_client':
|
||||
client_id = message['client_id']
|
||||
for conn_id, conn_info in self.active_connections.items():
|
||||
if conn_info['client_id'] == client_id:
|
||||
conn_info['local_sock'].close()
|
||||
self.active_connections.pop(conn_id)
|
||||
else:
|
||||
print(f"收到未知消息: {message}")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
pass
|
||||
|
||||
def handle_connection(self, conn_id, client_addr, client_id):
|
||||
"""处理来自客户端的连接"""
|
||||
try:
|
||||
# 接受本地服务连接
|
||||
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
local_sock.connect(('127.0.0.1', self.internal_port))
|
||||
if not local_sock:
|
||||
print("无法连接到本地服务")
|
||||
return
|
||||
|
||||
# 创建与客户端的UDP隧道
|
||||
print(f"建立连接 {conn_id} : {client_addr} -> {('127.0.0.1', self.internal_port)}")
|
||||
|
||||
# 存储连接
|
||||
self.active_connections[conn_id] = {
|
||||
'local_sock': local_sock,
|
||||
'client_addr': client_addr,
|
||||
'client_id': client_id
|
||||
}
|
||||
|
||||
# 通知客户端连接就绪
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'connected',
|
||||
'client_id': conn_id
|
||||
}).encode(), client_addr)
|
||||
|
||||
# 启动数据转发
|
||||
self.forward_data(conn_id, local_sock, client_addr)
|
||||
except Exception as e:
|
||||
print(f"连接失败: {str(e)}")
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'connect_failed',
|
||||
'client_id': conn_id,
|
||||
'message': str(e)
|
||||
}).encode(), client_addr)
|
||||
|
||||
def forward_data(self, conn_id, local_sock, client_addr):
|
||||
"""转发TCP数据到UDP隧道"""
|
||||
CHUNK_SIZE = 2048 # 根据 MTU 调整最大分片大小
|
||||
try:
|
||||
while True:
|
||||
# 从本地服务读取数据
|
||||
data = local_sock.recv(65535)
|
||||
if not data:
|
||||
break
|
||||
|
||||
# 通过UDP发送给客户端
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'data',
|
||||
'conn_id': conn_id,
|
||||
'data': data.hex() # 十六进制编码二进制数据
|
||||
}).encode(), client_addr)
|
||||
print(f"转发数据给客户端{client_addr}")
|
||||
# for i in range(0, len(data), CHUNK_SIZE):
|
||||
# chunk = data[i:i + CHUNK_SIZE]
|
||||
# self.udp_sock.sendto(json.dumps({
|
||||
# 'action': 'data',
|
||||
# 'conn_id': conn_id,
|
||||
# 'data': chunk.hex()
|
||||
# }).encode(), client_addr)
|
||||
except Exception as e:
|
||||
print(f"转发数据失败: {str(e)}")
|
||||
finally:
|
||||
local_sock.close()
|
||||
if conn_id in self.active_connections:
|
||||
del self.active_connections[conn_id]
|
||||
print(f"连接 {conn_id} 已关闭")
|
||||
|
||||
def run(self):
|
||||
"""运行服务提供端"""
|
||||
# 注册服务
|
||||
self.register_service()
|
||||
|
||||
# 启动UDP监听线程
|
||||
threading.Thread(target=self.udp_listener, daemon=True).start()
|
||||
|
||||
# 保持主线程运行
|
||||
try:
|
||||
while self.running:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
self.running = False
|
||||
self.udp_sock.sendto(json.dumps({'action': 'stop_provider'}).encode(), self.coordinator_addr)
|
||||
for conn_id, conn_info in self.active_connections.items():
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'stop_conn',
|
||||
'conn_id': conn_id
|
||||
}).encode(), conn_info['client_addr'])
|
||||
self.udp_sock.close()
|
||||
print("服务提供端已停止")
|
||||
|
||||
def punch_hole(self, message):
|
||||
for i in range(5):
|
||||
try:
|
||||
self.udp_sock.sendto(json.dumps({
|
||||
'action': 'punch',
|
||||
'client_id': message['client_id']
|
||||
}).encode(), tuple(message['client_addr']))
|
||||
time.sleep(0.2)
|
||||
except Exception as e:
|
||||
print(f"打洞失败: {str(e)}")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 配置信息
|
||||
COORDINATOR_ADDR = ('www.awin-x.top', 5000) # 替换为公网服务器IP
|
||||
SERVICE_NAME = "my_game_server"
|
||||
INTERNAL_PORT = 5001 # 内网游戏服务器端口
|
||||
|
||||
provider = ServiceProvider(COORDINATOR_ADDR, SERVICE_NAME, INTERNAL_PORT)
|
||||
provider.run()
|
||||
155
server.py
Normal file
155
server.py
Normal file
@ -0,0 +1,155 @@
|
||||
import json
|
||||
import socket
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
|
||||
services = {
|
||||
'ssh': 22,
|
||||
'alist': 5244,
|
||||
'minecraft': 25565
|
||||
}
|
||||
co_server = 'www.awin-x.top'
|
||||
co_port = 5000
|
||||
|
||||
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
tcp_sock.bind(('0.0.0.0', 0))
|
||||
tcp_sock.listen(5)
|
||||
listen_port = tcp_sock.getsockname()[1]
|
||||
print(f"监听端口: {listen_port}")
|
||||
|
||||
|
||||
def pack_data(action_, message_, data_):
|
||||
# 将字符串编码为UTF-8字节串
|
||||
action_bytes = action_.encode('utf-8')
|
||||
message_bytes = message_.encode('utf-8')
|
||||
# 使用大端序打包长度信息(4字节无符号整数)
|
||||
# >I: 大端序无符号整型(4字节)
|
||||
packed = struct.pack('>I', len(action_bytes))
|
||||
packed += action_bytes
|
||||
packed += struct.pack('>I', len(message_bytes))
|
||||
packed += message_bytes
|
||||
packed += data_ # 直接附加二进制数据
|
||||
return packed
|
||||
|
||||
|
||||
def unpack_data(packed):
|
||||
# 解析action长度(前4字节)
|
||||
action_len = struct.unpack('>I', packed[:4])[0]
|
||||
offset = 4
|
||||
# 提取action字节并解码
|
||||
action_bytes = packed[offset:offset + action_len]
|
||||
action_ = action_bytes.decode('utf-8')
|
||||
offset += action_len
|
||||
# 解析message长度(接下来的4字节)
|
||||
message_len = struct.unpack('>I', packed[offset:offset + 4])[0]
|
||||
offset += 4
|
||||
# 提取message字节并解码
|
||||
message_bytes = packed[offset:offset + message_len]
|
||||
message_ = message_bytes.decode('utf-8')
|
||||
offset += message_len
|
||||
# 剩余部分是原始二进制数据
|
||||
data_ = packed[offset:]
|
||||
return action_, message_, data_
|
||||
|
||||
|
||||
def send_data(conn, action_, message_, data_):
|
||||
packed = pack_data(action_, message_, data_)
|
||||
conn.sendall(packed)
|
||||
|
||||
|
||||
def recv_data(conn):
|
||||
data_ = conn.recv(4096)
|
||||
action_, message_, data_ = unpack_data(data_)
|
||||
# print(f"收到来自 {conn.getpeername()} 的数据: {data.decode()}")
|
||||
return action_, message_, data_
|
||||
|
||||
|
||||
def handle_hello(addr, message, data):
|
||||
pass
|
||||
|
||||
|
||||
def handle_bye(addr, message, data):
|
||||
pass
|
||||
|
||||
|
||||
def handle_punch_to(addr, message, data):
|
||||
pass
|
||||
|
||||
|
||||
actions = {
|
||||
'hello': handle_hello,
|
||||
'bye': lambda: print("收到来自 {client_addr} 的断开连接请求"),
|
||||
'punch_to': lambda: print("收到来自 {client_addr} 的打洞请求"),
|
||||
}
|
||||
|
||||
|
||||
def heart_beat_thread():
|
||||
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
udp_sock.bind(('0.0.0.0', listen_port))
|
||||
heart_beat_pack = pack_data('heart_beat', 'json_data', json.dumps({
|
||||
'services': services,
|
||||
}).encode())
|
||||
while True:
|
||||
udp_sock.sendto(heart_beat_pack, (co_server, co_port))
|
||||
time.sleep(10)
|
||||
|
||||
|
||||
def co_server_thread(co_server_sock, co_server_addr):
|
||||
while True:
|
||||
try:
|
||||
action, message, data = recv_data(co_server_sock)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
send_data(co_server_addr, 'error', f"无法解析数据", b'')
|
||||
break
|
||||
if action in actions:
|
||||
actions[action](co_server_addr, message, data)
|
||||
else:
|
||||
send_data(co_server_addr, 'error', f"未知操作: {action}", b'')
|
||||
co_server_sock.close()
|
||||
|
||||
|
||||
def client_thread(client_sock, client_addr):
|
||||
while True:
|
||||
try:
|
||||
action, message, data = recv_data(client_sock)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
send_data(client_sock, 'error', f"无法解析数据", b'')
|
||||
break
|
||||
if action in actions:
|
||||
actions[action](client_sock, client_addr, message, data)
|
||||
else:
|
||||
send_data(client_sock, 'error', f"未知操作: {action}", b'')
|
||||
client_sock.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 心跳线程
|
||||
threading.Thread(target=heart_beat_thread, daemon=True).start()
|
||||
|
||||
# 等待协服务器
|
||||
while True:
|
||||
co_server_sock, co_server_addr = tcp_sock.accept()
|
||||
action, message, data = recv_data(co_server_sock)
|
||||
if action == 'hello':
|
||||
if handle_hello(co_server_addr, message, data) == 'co_server':
|
||||
print(f'协服务器{co_server_addr}连接成功')
|
||||
threading.Thread(target=co_server_thread, daemon=True).start()
|
||||
break
|
||||
co_server_sock.close()
|
||||
|
||||
# 等待客户端
|
||||
while True:
|
||||
client_sock, client_addr = tcp_sock.accept()
|
||||
action, message, data = recv_data(client_sock)
|
||||
print(f"收到来自 {client_addr} 的连接")
|
||||
if action == 'hello':
|
||||
if handle_hello(co_server_addr, message, data) == 'client':
|
||||
print(f"确认客户端 {client_addr}")
|
||||
threading.Thread(target=client_thread, daemon=True).start()
|
||||
break
|
||||
client_sock.close()
|
||||
190
server1.py
Normal file
190
server1.py
Normal file
@ -0,0 +1,190 @@
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
from utils import *
|
||||
|
||||
services = {
|
||||
'ssh': 22,
|
||||
'alist': 5244,
|
||||
'minecraft': 25565
|
||||
}
|
||||
server_name = 'awin_server'
|
||||
co_server = 'www.awin-x.top'
|
||||
co_port = 5000
|
||||
|
||||
server_token = f"server_{uuid.uuid4().hex[:8]}"
|
||||
my_charactor = 'server'
|
||||
|
||||
co_server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
co_server_sock.connect((co_server, co_port))
|
||||
|
||||
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
udp_sock.bind(('0.0.0.0', 0))
|
||||
listen_port = udp_sock.getsockname()[1]
|
||||
|
||||
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
tcp_sock.bind(('0.0.0.0', listen_port))
|
||||
tcp_sock.listen(5)
|
||||
print(f"监听端口: {listen_port}")
|
||||
|
||||
conns = {}
|
||||
|
||||
clients = {}
|
||||
|
||||
|
||||
def heart_beat_thread():
|
||||
heart_beat_pack = pack_data('ping')
|
||||
while True:
|
||||
print('ping 到 co_server')
|
||||
send_pack_tcp(co_server_sock, heart_beat_pack)
|
||||
time.sleep(10)
|
||||
|
||||
|
||||
def handle_punch_to(message):
|
||||
client_name = message['client_name']
|
||||
client_token = message['client_token']
|
||||
target_addr = tuple(message['target_addr'])
|
||||
clients[client_name] = {
|
||||
'client_token': client_token,
|
||||
'addr': target_addr,
|
||||
}
|
||||
print(f'打洞到{target_addr}')
|
||||
for i in range(100):
|
||||
send_action_udp(udp_sock, target_addr, 'punch')
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
def handle_punch_request(message):
|
||||
co_server_punch_port = message['co_server_punch_port']
|
||||
print(f'暴露端口:{listen_port}->{(co_server, co_server_punch_port)}')
|
||||
for i in range(5):
|
||||
send_action_udp(udp_sock, (co_server, co_server_punch_port), 'server_punch_port')
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def handle_bye(sock):
|
||||
print(f"收到来自 {sock.getpeername()} 的断开连接请求")
|
||||
try:
|
||||
send_action_tcp(sock, 'bye', 'bye from server')
|
||||
sock.close()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
||||
def handle_data(conn_id, data):
|
||||
conns[conn_id]['conn_sock'].sendall(data)
|
||||
|
||||
|
||||
def forward_data_thread(conn_id):
|
||||
conn = conns[conn_id]
|
||||
while conn_id in conns:
|
||||
data = conn['conn_sock'].recv(4096)
|
||||
try:
|
||||
send_data_tcp(conn['client_sock'], data, {'conn_id': conn_id})
|
||||
except Exception as e:
|
||||
print(e[:10]+"->service connection breaks")
|
||||
break
|
||||
|
||||
|
||||
def handle_connect_service(message):
|
||||
service = message['service']
|
||||
service_port = services[service]
|
||||
client_name = message['client_name']
|
||||
conn_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
conn_sock.connect(('127.0.0.1', service_port))
|
||||
conn_id = message['conn_id']
|
||||
conns[conn_id] = {
|
||||
'conn_sock': conn_sock,
|
||||
'client_name': client_name,
|
||||
'service': service
|
||||
}
|
||||
threading.Thread(target=forward_data_thread, args=conn_id, daemon=True).start()
|
||||
|
||||
|
||||
def handle_disconnect_service(message):
|
||||
conn_id = message['conn_id']
|
||||
if conn_id in conns:
|
||||
conn_sock = conns[conn_id]['conn_sock']
|
||||
conn_sock.close()
|
||||
del conns[conn_id]
|
||||
|
||||
|
||||
def client_thread(client_sock, client_addr):
|
||||
while True:
|
||||
action, message, data = recv_tcp(client_sock)
|
||||
if action == 'data':
|
||||
handle_data(message['conn_id'], data)
|
||||
elif action == 'bye':
|
||||
handle_bye(client_sock)
|
||||
break
|
||||
elif action == 'connect_service':
|
||||
handle_connect_service(message)
|
||||
elif action == 'disconnect_service':
|
||||
handle_disconnect_service(message)
|
||||
else:
|
||||
send_action_tcp(client_sock, 'error', f"未知操作: {action}")
|
||||
|
||||
|
||||
def tcp_accept_thread():
|
||||
while True:
|
||||
sock, addr = tcp_sock.accept()
|
||||
print(f"收到来自 {addr} 的连接")
|
||||
action, message, data = recv_tcp(sock)
|
||||
if action == 'client_hello':
|
||||
client_name = message['client_name']
|
||||
client_token = message['client_token']
|
||||
if client_name in clients:
|
||||
if clients[client_name]['client_token'] == client_token:
|
||||
if 'sock' in clients[client_name]:
|
||||
send_action_tcp(sock, 'error', f"{client_name} 已存在")
|
||||
print(f"{client_name} 已存在")
|
||||
else:
|
||||
clients[client_name]['sock'] = sock
|
||||
clients[client_name]['addr'] = addr
|
||||
send_action_tcp(sock, 'success', f"{client_name} 已连接")
|
||||
threading.Thread(target=client_thread, args=(sock, addr)).start()
|
||||
print(f"{client_name} 已连接")
|
||||
else:
|
||||
print(f'{addr},token 错误')
|
||||
else:
|
||||
print(f'{addr},未注册')
|
||||
else:
|
||||
print(f'{addr},未知操作')
|
||||
sock.close()
|
||||
|
||||
|
||||
def main():
|
||||
send_action_tcp(co_server_sock, 'server_hello', {
|
||||
'name': server_name,
|
||||
'token': server_token,
|
||||
'charactor': my_charactor
|
||||
})
|
||||
action, message, data = recv_tcp(co_server_sock)
|
||||
if action == 'co_server_hello':
|
||||
co_server_token = message['token']
|
||||
else:
|
||||
print(f"连接到协服务器失败{message['message']}")
|
||||
return
|
||||
|
||||
threading.Thread(target=heart_beat_thread, daemon=True).start()
|
||||
|
||||
while True:
|
||||
action, message, data = recv_tcp(co_server_sock)
|
||||
if action == 'punch_to':
|
||||
handle_punch_to(message)
|
||||
elif action == 'punch_request':
|
||||
handle_punch_request(message)
|
||||
elif action == 'pong':
|
||||
print('pong 来自 co_server')
|
||||
# elif action == 'data':
|
||||
# handle_data(co_server_sock, co_server_addr, message, data)
|
||||
elif action == 'bye':
|
||||
handle_bye(co_server_sock)
|
||||
else:
|
||||
print(f'收到来自co_server的未知消息{action}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
104
utils.py
Normal file
104
utils.py
Normal file
@ -0,0 +1,104 @@
|
||||
import json
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
|
||||
|
||||
def pack_data(action, message=None, data=None):
|
||||
if data is None:
|
||||
data = b''
|
||||
if message is None:
|
||||
message = {}
|
||||
# 将字符串编码为UTF-8字节串
|
||||
action_bytes = action.encode('utf-8')
|
||||
message_bytes = json.dumps(message).encode('utf-8')
|
||||
# 使用大端序打包长度信息(4字节无符号整数)
|
||||
# >I: 大端序无符号整型(4字节)
|
||||
packed = struct.pack('>I', len(action_bytes))
|
||||
packed += action_bytes
|
||||
packed += struct.pack('>I', len(message_bytes))
|
||||
packed += message_bytes
|
||||
packed += data # 直接附加二进制数据
|
||||
return packed
|
||||
|
||||
|
||||
def unpack_data(packed):
|
||||
# 解析action长度(前4字节)
|
||||
action_len = struct.unpack('>I', packed[:4])[0]
|
||||
offset = 4
|
||||
# 提取action字节并解码
|
||||
action_bytes = packed[offset:offset + action_len]
|
||||
action = action_bytes.decode('utf-8')
|
||||
offset += action_len
|
||||
# 解析message长度(接下来的4字节)
|
||||
message_len = struct.unpack('>I', packed[offset:offset + 4])[0]
|
||||
offset += 4
|
||||
# 提取message字节并解码
|
||||
message_bytes = packed[offset:offset + message_len]
|
||||
message = json.loads(message_bytes.decode('utf-8'))
|
||||
offset += message_len
|
||||
# 剩余部分是原始二进制数据
|
||||
data = packed[offset:]
|
||||
return action, message, data
|
||||
|
||||
|
||||
def send_pack_tcp(sock, pack):
|
||||
sock.sendall(pack)
|
||||
|
||||
|
||||
def send_tcp(sock, action, data, message):
|
||||
packed = pack_data(action, message, data)
|
||||
sock.sendall(packed)
|
||||
|
||||
|
||||
def send_udp(sock, action, data, message, addr):
|
||||
if isinstance(sock, int):
|
||||
sock_ = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock_.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock_.bind(('0.0.0.0', sock))
|
||||
else:
|
||||
sock_ = sock
|
||||
packed = pack_data(action, message, data)
|
||||
sock_.sendto(packed, addr)
|
||||
if isinstance(sock, int):
|
||||
sock_.close()
|
||||
|
||||
|
||||
def send_data_udp(sock, addr, data, message=None):
|
||||
send_udp(sock, 'data', data, message, addr)
|
||||
|
||||
|
||||
def send_data_tcp(sock, data, message=None):
|
||||
send_tcp(sock, 'data', data, message)
|
||||
|
||||
|
||||
def send_action_udp(sock, addr, action, message=None):
|
||||
if isinstance(message, str):
|
||||
message = {'message': message}
|
||||
elif isinstance(message, dict):
|
||||
message['timestamp'] = int(time.time())
|
||||
else:
|
||||
try:
|
||||
str_message = str(message)
|
||||
message = {'message': str_message}
|
||||
except Exception as e:
|
||||
print(f'无法将数据转换为字符串: {e}')
|
||||
send_udp(sock, action, None, message, addr)
|
||||
|
||||
|
||||
def send_action_tcp(sock, action, message=None):
|
||||
if isinstance(message, str):
|
||||
message = {'message': message}
|
||||
send_tcp(sock, action, None, message)
|
||||
|
||||
|
||||
def recv_tcp(sock):
|
||||
pack = sock.recv(4096)
|
||||
action, message, data = unpack_data(pack)
|
||||
return action, message, data
|
||||
|
||||
|
||||
def recv_udp(sock):
|
||||
pack, addr = sock.recvfrom(4096)
|
||||
action, message, data = unpack_data(pack)
|
||||
return action, message, data, addr
|
||||
Loading…
Reference in New Issue
Block a user