AwinSimpleP2P/utiles.py
2025-06-08 23:49:38 +08:00

45 lines
1.4 KiB
Python

import json
import socket
import struct
class Package:
def __init__(self, action: str, message: dict = None, data: bytes = None):
self.action = action
self.message = message if message else {}
self.data = data if data else b''
def pack(self) -> bytes:
action_encode = self.action.encode('utf-8')
message_encode = json.dumps(self.message).encode('utf-8')
packed = struct.pack('>I', len(action_encode)) + action_encode
packed += struct.pack('>I', len(message_encode)) + message_encode
packed += self.data
return packed
@classmethod
def unpack(cls, packed):
action_len = struct.unpack('>I', packed[:4])[0]
action = packed[4:4 + action_len].decode('utf-8')
message_len = struct.unpack('>I', packed[4 + action_len:8 + action_len])[0]
message = json.loads(packed[8 + action_len:8 + action_len + message_len].decode('utf-8'))
data = packed[8 + action_len + message_len:]
return cls(action, message, data)
def get_data(self) -> bytes:
return self.data
def get_message(self) -> dict:
return self.message
def get_action(self) -> str:
return self.action
def get_all(self) -> (str, dict, bytes):
return self.action, self.message, self.data
def recv_package(sock) -> (tuple, Package):
addr, pack = sock.recvfrom(1300)
return addr, Package.unpack(pack)