diff --git a/client.py b/client.py new file mode 100644 index 0000000..22aab10 --- /dev/null +++ b/client.py @@ -0,0 +1,125 @@ +import threading +import uuid + +from utils import * + +client_token = f"client_{uuid.uuid4().hex[:8]}" +my_charactor = "client" +client_name = 'awin_client' + +server_name = 'awin_server' +service_name = 'alist' + +co_server = 'www.awin-x.top' +co_port = 5000 + +co_server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +co_server_sock.connect((co_server, co_port)) + +listen_port = 12345 +listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +listen_sock.bind(('0.0.0.0', listen_port)) +listen_sock.listen(5) + +udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +udp_sock.bind(('0.0.0.0', 0)) +tcp_port = udp_sock.getsockname()[1] + +server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + +conns = {} + + +def client_thread(conn_id): + client_sock = conns[conn_id]['client_sock'] + while True: + data = client_sock.recv(4096) + send_data_tcp(server_sock, data, {'conn_id': conn_id}) + + +def tcp_accept_thread(): + while True: + client_sock, client_addr = listen_sock.accept() + conn_id = f"conn_{uuid.uuid4().hex[:8]}" + conns[conn_id] = { + 'client_sock': client_sock, + 'client_addr': client_addr, + 'ready': False + } + send_action_tcp(server_sock, 'connect_service', { + 'service': service_name, + 'conn_id': conn_id, + 'client_addr': client_addr, + 'client_name': client_name + }) + count = 0 + while not conns[conn_id]['ready']: + if count > 50: + send_action_tcp(server_sock, 'disconnect_service', { + 'conn_id': conn_id + }) + break + time.sleep(0.1) + threading.Thread(target=client_thread, args=conn_id).start() + + +def handle_data(conn_id, data): + conns[conn_id]['client_sock'].sendall(data) + + +def server_thread(): + global client_name, client_token + send_action_tcp(server_sock, 'client_hello', { + 'client_name': client_name, + 'client_token': client_token + }) + while True: + action, message, data = recv_tcp(server_sock) + if action == 'error': + print(message['message']) + elif action == 'data': + handle_data(message['conn_id'], data) + elif action == 'bye': + break + server_sock.close() + print("bye, from server") + exit() + + +def main(): + send_action_tcp(co_server_sock, 'client_hello', { + 'name': client_name, + 'token': client_token, + 'charactor': my_charactor, + }) + send_action_tcp(co_server_sock, 'punch_request', { + 'server_name': 'awin_server' + }) + action, message, data = recv_tcp(co_server_sock) + if action == 'punch_request': + co_server_punch_port = message['co_server_punch_port'] + else: + print('punch request failed') + return + print(f'暴露端口:{tcp_port}->{(co_server, co_server_punch_port)}') + packed = pack_data(action='client_punch_port') + for i in range(5): + udp_sock.sendto(packed, (co_server, co_server_punch_port)) + time.sleep(0.1) + action, message, data = recv_tcp(co_server_sock) + if action == 'punch_to': + target_addr = tuple(message['target_addr']) + print(f'打洞到{target_addr}') + udp_sock.close() + server_sock.bind(('0.0.0.0', tcp_port)) + server_sock.connect(target_addr) + threading.Thread(target=server_thread, daemon=True).start() + else: + print(action) + print('punch to failed') + return + + +if __name__ == '__main__': + main() diff --git a/co_server.py b/co_server.py new file mode 100644 index 0000000..70a144a --- /dev/null +++ b/co_server.py @@ -0,0 +1,151 @@ +import socket +import threading +import uuid + +from utils import * + +co_server_port = 5000 +tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +tcp_sock.bind(('0.0.0.0', co_server_port)) +tcp_sock.listen(5) + +servers = {} + +clients = {} + +co_server_token = f"co_server_{uuid.uuid4().hex[:8]}" +my_charactor = 'co_server' + + +def handle_punch_request(client_name, server_name): + client = clients[client_name] + server = servers[server_name] + udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + udp_sock.bind(('0.0.0.0', 5002)) + udp_sock.settimeout(20) + co_server_punch_port = udp_sock.getsockname()[1] + print(f'向客户端和服务端发起端口暴露请求到{co_server_punch_port}') + send_action_tcp(server['sock'], 'punch_request', { + 'co_server_punch_port': co_server_punch_port + }) + send_action_tcp(client['sock'], 'punch_request', { + 'co_server_punch_port': co_server_punch_port + }) + client_punch_port = 0 + server_punch_port = 0 + count = 0 + while client_punch_port == 0 or server_punch_port == 0: + action, message, data, addr = recv_udp(udp_sock) + if action == 'server_punch_port': + print(f'获取到服务端打洞端口{addr[1]}') + server_punch_port = addr[1] + if action == 'client_punch_port': + print(f'获取到客户端打洞端口{addr[1]}') + client_punch_port = addr[1] + time.sleep(0.1) + count += 1 + udp_sock.close() + print('发送打洞请求') + send_action_tcp(client['sock'], 'punch_to', { + 'server_name': server_name, + 'server_token': server['token'], + 'target_addr': (server['addr'][0], server_punch_port) + }) + send_action_tcp(server['sock'], 'punch_to', { + 'client_name': client_name, + 'client_token': client['token'], + 'target_addr': (client['addr'][0], client_punch_port) + }) + + +def handle_server(sock, addr, server_info): + server_name = server_info['name'] + server_token = server_info['token'] + if server_name in servers: + send_action_tcp(sock, 'bye', 'server already exists') + return + send_action_tcp(sock, 'co_server_hello', { + 'charactor': my_charactor, + 'token': co_server_token, + }) + servers[server_name] = { + 'sock': sock, + 'addr': addr, + 'name': server_name, + 'token': server_token, + } + while True: + try: + action, message, data = recv_tcp(sock) + if action == 'ping': + send_action_tcp(sock, 'pong') + else: + send_action_tcp(sock, 'bye', 'unknown action') + break + except Exception as e: + print(e) + send_action_tcp(sock, 'bye', f'error: {e}') + break + sock.close() + del servers[server_name] + + +def handle_client(sock, addr, client_info): + client_name = client_info['name'] + client_token = client_info['token'] + clients[client_name] = { + 'sock': sock, + 'addr': addr, + 'token': client_token, + } + while True: + try: + action, message, data = recv_tcp(sock) + if action == 'ping': + send_action_tcp(sock, 'pong') + elif action == 'punch_request': + server_name = message['server_name'] + if server_name in servers: + handle_punch_request(client_name, server_name) + else: + send_action_tcp(sock, 'error', f"服务器不存在: {server_name}") + break + except Exception as e: + print(e) + send_action_tcp(sock, 'bye', f'error: {e}') + break + sock.close() + del clients[client_name] + + +def handle_connect(sock, addr): + try: + action, message, data = recv_tcp(sock) + except Exception as e: + print(e) + send_action_tcp(sock, 'error', f"无法解析数据") + return + if action == 'server_hello': + handle_server(sock, addr, message) + elif action == 'client_hello': + handle_client(sock, addr, message) + else: + print(f"未知连接:{addr} -> {action}") + sock.close() + + +def main(): + print(f"{my_charactor} 启动") + while True: + conn, addr = tcp_sock.accept() + print(f"{my_charactor} 收到来自 {addr} 的连接") + threading.Thread( + target=handle_connect, + args=(conn, addr), + daemon=True + ).start() + + +if __name__ == '__main__': + main() diff --git a/server1.py b/server1.py new file mode 100644 index 0000000..84189c7 --- /dev/null +++ b/server1.py @@ -0,0 +1,190 @@ +import threading +import uuid + +from utils import * + +services = { + 'ssh': 22, + 'alist': 5244, + 'minecraft': 25565 +} +server_name = 'awin_server' +co_server = 'www.awin-x.top' +co_port = 5000 + +server_token = f"server_{uuid.uuid4().hex[:8]}" +my_charactor = 'server' + +co_server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +co_server_sock.connect((co_server, co_port)) + +udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +udp_sock.bind(('0.0.0.0', 0)) +listen_port = udp_sock.getsockname()[1] + +tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +tcp_sock.bind(('0.0.0.0', listen_port)) +tcp_sock.listen(5) +print(f"监听端口: {listen_port}") + +conns = {} + +clients = {} + + +def heart_beat_thread(): + heart_beat_pack = pack_data('ping') + while True: + print('ping 到 co_server') + send_pack_tcp(co_server_sock, heart_beat_pack) + time.sleep(10) + + +def handle_punch_to(message): + client_name = message['client_name'] + client_token = message['client_token'] + target_addr = tuple(message['target_addr']) + clients[client_name] = { + 'client_token': client_token, + 'addr': target_addr, + } + print(f'打洞到{target_addr}') + for i in range(100): + send_action_udp(udp_sock, target_addr, 'punch') + time.sleep(0.01) + + +def handle_punch_request(message): + co_server_punch_port = message['co_server_punch_port'] + print(f'暴露端口:{listen_port}->{(co_server, co_server_punch_port)}') + for i in range(5): + send_action_udp(udp_sock, (co_server, co_server_punch_port), 'server_punch_port') + time.sleep(0.1) + + +def handle_bye(sock): + print(f"收到来自 {sock.getpeername()} 的断开连接请求") + try: + send_action_tcp(sock, 'bye', 'bye from server') + sock.close() + except Exception as e: + print(e) + + +def handle_data(conn_id, data): + conns[conn_id]['conn_sock'].sendall(data) + + +def forward_data_thread(conn_id): + conn = conns[conn_id] + while conn_id in conns: + data = conn['conn_sock'].recv(4096) + try: + send_data_tcp(conn['client_sock'], data, {'conn_id': conn_id}) + except Exception as e: + print(e[:10]+"->service connection breaks") + break + + +def handle_connect_service(message): + service = message['service'] + service_port = services[service] + client_name = message['client_name'] + conn_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn_sock.connect(('127.0.0.1', service_port)) + conn_id = message['conn_id'] + conns[conn_id] = { + 'conn_sock': conn_sock, + 'client_name': client_name, + 'service': service + } + threading.Thread(target=forward_data_thread, args=conn_id, daemon=True).start() + + +def handle_disconnect_service(message): + conn_id = message['conn_id'] + if conn_id in conns: + conn_sock = conns[conn_id]['conn_sock'] + conn_sock.close() + del conns[conn_id] + + +def client_thread(client_sock, client_addr): + while True: + action, message, data = recv_tcp(client_sock) + if action == 'data': + handle_data(message['conn_id'], data) + elif action == 'bye': + handle_bye(client_sock) + break + elif action == 'connect_service': + handle_connect_service(message) + elif action == 'disconnect_service': + handle_disconnect_service(message) + else: + send_action_tcp(client_sock, 'error', f"未知操作: {action}") + + +def tcp_accept_thread(): + while True: + sock, addr = tcp_sock.accept() + print(f"收到来自 {addr} 的连接") + action, message, data = recv_tcp(sock) + if action == 'client_hello': + client_name = message['client_name'] + client_token = message['client_token'] + if client_name in clients: + if clients[client_name]['client_token'] == client_token: + if 'sock' in clients[client_name]: + send_action_tcp(sock, 'error', f"{client_name} 已存在") + print(f"{client_name} 已存在") + else: + clients[client_name]['sock'] = sock + clients[client_name]['addr'] = addr + send_action_tcp(sock, 'success', f"{client_name} 已连接") + threading.Thread(target=client_thread, args=(sock, addr)).start() + print(f"{client_name} 已连接") + else: + print(f'{addr},token 错误') + else: + print(f'{addr},未注册') + else: + print(f'{addr},未知操作') + sock.close() + + +def main(): + send_action_tcp(co_server_sock, 'server_hello', { + 'name': server_name, + 'token': server_token, + 'charactor': my_charactor + }) + action, message, data = recv_tcp(co_server_sock) + if action == 'co_server_hello': + co_server_token = message['token'] + else: + print(f"连接到协服务器失败{message['message']}") + return + + threading.Thread(target=heart_beat_thread, daemon=True).start() + + while True: + action, message, data = recv_tcp(co_server_sock) + if action == 'punch_to': + handle_punch_to(message) + elif action == 'punch_request': + handle_punch_request(message) + elif action == 'pong': + print('pong 来自 co_server') + # elif action == 'data': + # handle_data(co_server_sock, co_server_addr, message, data) + elif action == 'bye': + handle_bye(co_server_sock) + else: + print(f'收到来自co_server的未知消息{action}') + + +if __name__ == '__main__': + main() diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..dc4a9b4 --- /dev/null +++ b/utils.py @@ -0,0 +1,104 @@ +import json +import socket +import struct +import time + + +def pack_data(action, message=None, data=None): + if data is None: + data = b'' + if message is None: + message = {} + # 将字符串编码为UTF-8字节串 + action_bytes = action.encode('utf-8') + message_bytes = json.dumps(message).encode('utf-8') + # 使用大端序打包长度信息(4字节无符号整数) + # >I: 大端序无符号整型(4字节) + packed = struct.pack('>I', len(action_bytes)) + packed += action_bytes + packed += struct.pack('>I', len(message_bytes)) + packed += message_bytes + packed += data # 直接附加二进制数据 + return packed + + +def unpack_data(packed): + # 解析action长度(前4字节) + action_len = struct.unpack('>I', packed[:4])[0] + offset = 4 + # 提取action字节并解码 + action_bytes = packed[offset:offset + action_len] + action = action_bytes.decode('utf-8') + offset += action_len + # 解析message长度(接下来的4字节) + message_len = struct.unpack('>I', packed[offset:offset + 4])[0] + offset += 4 + # 提取message字节并解码 + message_bytes = packed[offset:offset + message_len] + message = json.loads(message_bytes.decode('utf-8')) + offset += message_len + # 剩余部分是原始二进制数据 + data = packed[offset:] + return action, message, data + + +def send_pack_tcp(sock, pack): + sock.sendall(pack) + + +def send_tcp(sock, action, data, message): + packed = pack_data(action, message, data) + sock.sendall(packed) + + +def send_udp(sock, action, data, message, addr): + if isinstance(sock, int): + sock_ = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock_.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock_.bind(('0.0.0.0', sock)) + else: + sock_ = sock + packed = pack_data(action, message, data) + sock_.sendto(packed, addr) + if isinstance(sock, int): + sock_.close() + + +def send_data_udp(sock, addr, data, message=None): + send_udp(sock, 'data', data, message, addr) + + +def send_data_tcp(sock, data, message=None): + send_tcp(sock, 'data', data, message) + + +def send_action_udp(sock, addr, action, message=None): + if isinstance(message, str): + message = {'message': message} + elif isinstance(message, dict): + message['timestamp'] = int(time.time()) + else: + try: + str_message = str(message) + message = {'message': str_message} + except Exception as e: + print(f'无法将数据转换为字符串: {e}') + send_udp(sock, action, None, message, addr) + + +def send_action_tcp(sock, action, message=None): + if isinstance(message, str): + message = {'message': message} + send_tcp(sock, action, None, message) + + +def recv_tcp(sock): + pack = sock.recv(4096) + action, message, data = unpack_data(pack) + return action, message, data + + +def recv_udp(sock): + pack, addr = sock.recvfrom(4096) + action, message, data = unpack_data(pack) + return action, message, data, addr