""" Custom serializer class to replace pickle. This does not use eval() when (de)serializing objects and thus is safe from arbitrary code execution (hopefully) Adapted from: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/415791 """ from cStringIO import StringIO from struct import pack, unpack from types import IntType, TupleType, StringType, FloatType, LongType, ListType, DictType, NoneType, BooleanType, UnicodeType from pyscrabble.game.player import User, Player, PlayerInfo from pyscrabble.game.pieces import Letter, Move from pyscrabble.game.game import ScrabbleGameInfo from pyscrabble.lookup import ServerMessage from pyscrabble.util import Time, TimeDeltaWrapper,ServerBulletin, PrivateMessage from time import struct_time from datetime import timedelta import zlib class EncodeError(Exception): pass class DecodeError(Exception): pass HEADER = "SRW3" protocol = { TupleType : "T", ListType : "L", DictType : "D", LongType : "B", IntType : "I", FloatType : "F", StringType : "S", NoneType : "N", BooleanType : "b", UnicodeType : "U", User : "u", struct_time : "t", Letter : "l", Move : "m", ScrabbleGameInfo : "g", Player : "p", ServerMessage : "s", PrivateMessage : "r", ServerBulletin : "a", PlayerInfo : "c", timedelta : "d", Time : "e", TimeDeltaWrapper : "f" } encoder = {} class register_encoder_for_type(object): """Registers an encoder function, for a type, in the global encoder dictionary.""" def __init__(self, t): self.type = t def __call__(self, func): encoder[self.type] = func return func #contains dictionary of decoding functions, where the dictionary key is the type prefix used. decoder = {} class register_decoder_for_type(object): """Registers a decoder function, for a prefix, in the global decoder dictionary.""" def __init__(self, t): self.prefix = protocol[t] def __call__(self, func): decoder[self.prefix] = func return func ## ## @register_encoder_for_type(DictType) def enc_dict_type(obj): data = "".join([encoder[type(i)](i) for i in obj.items()]) return "%s%s%s" % ("D", pack("!L", len(data)), data) @register_encoder_for_type(struct_time) def enc_struct_time_type(obj): data = "".join([encoder[type(i)](i) for i in obj]) return "%s%s%s" % (protocol[struct_time], pack("!L", len(data)), data) @register_encoder_for_type(TupleType) @register_encoder_for_type(ListType) def enc_list_type(obj): data = "".join([encoder[type(i)](i) for i in obj]) return "%s%s%s" % (protocol[type(obj)], pack("!L", len(data)), data) @register_encoder_for_type(IntType) def enc_int_type(obj): return "%s%s" % (protocol[IntType], pack("!i", obj)) @register_encoder_for_type(FloatType) def enc_float_type(obj): return "%s%s" % (protocol[FloatType], pack("!d", obj)) @register_encoder_for_type(LongType) def enc_long_type(obj): obj = hex(obj)[2:-1] return "%s%s%s" % (protocol[LongType], pack("!L", len(obj)), obj) @register_encoder_for_type(UnicodeType) def enc_unicode_type(obj): obj = obj.encode('utf-8') return "%s%s%s" % (protocol[UnicodeType], pack("!L", len(obj)), obj) @register_encoder_for_type(StringType) def enc_string_type(obj): return "%s%s%s" % (protocol[StringType], pack("!L", len(obj)), obj) @register_encoder_for_type(NoneType) def enc_none_type(obj): return protocol[NoneType] @register_encoder_for_type(BooleanType) def enc_bool_type(obj): return protocol[BooleanType] + str(int(obj)) @register_encoder_for_type(User) @register_encoder_for_type(Player) @register_encoder_for_type(Move) @register_encoder_for_type(Letter) @register_encoder_for_type(ScrabbleGameInfo) @register_encoder_for_type(ServerMessage) @register_encoder_for_type(PrivateMessage) @register_encoder_for_type(ServerBulletin) @register_encoder_for_type(PlayerInfo) @register_encoder_for_type(Time) @register_encoder_for_type(TimeDeltaWrapper) def enc_obj_type(obj): data = "".join([encoder[type(i)](i) for i in obj.__dict__.items()]) return "%s%s%s" % (protocol[type(obj)], pack("!L", len(data)), data) @register_encoder_for_type(timedelta) def enc_time_delta_type(obj): data = (obj.days, obj.seconds) data = "".join([encoder[type(i)](i) for i in data]) return "%s%s%s" % (protocol[type(obj)], pack("!L", len(data)), data) def dumps(obj, compress=False): """Encode simple Python types into a binary string.""" option = "N" if compress: option = "Z" try: data = encoder[type(obj)](obj) if compress: data = zlib.compress(data) x = "%s%s%s" % (HEADER, option, data) #print x #print "dumps len %d" % len(x) return x except KeyError, e: raise EncodeError, "Type not supported. (%s) (%s)" % (e, str(obj.__class__)) ## ## ## ## def build_sequence(data, cast=list): size = unpack('!L', data.read(4))[0] items = [] data_tell = data.tell data_read = data.read items_append = items.append start_position = data.tell() while (data_tell() - start_position) < size: T = data_read(1) value = decoder[T](data) items_append(value) return cast(items) @register_decoder_for_type(struct_time) @register_decoder_for_type(TupleType) def dec_tuple_type(data): return build_sequence(data, cast=tuple) @register_decoder_for_type(ListType) def dec_list_type(data): return build_sequence(data, cast=list) @register_decoder_for_type(DictType) def dec_dict_type(data): return build_sequence(data, cast=dict) @register_decoder_for_type(LongType) def dec_long_type(data): size = unpack('!L', data.read(4))[0] value = long(data.read(size),16) return value @register_decoder_for_type(StringType) def dec_string_type(data): size = unpack('!L', data.read(4))[0] value = str(data.read(size)) return value @register_decoder_for_type(FloatType) def dec_float_type(data): value = unpack('!d', data.read(8))[0] return value @register_decoder_for_type(IntType) def dec_int_type(data): value = unpack('!i', data.read(4))[0] return value @register_decoder_for_type(NoneType) def dec_none_type(data): return None @register_decoder_for_type(BooleanType) def dec_bool_type(data): value = int(data.read(1)) return bool(value) @register_decoder_for_type(UnicodeType) def dec_unicode_type(data): size = unpack('!L', data.read(4))[0] value = data.read(size).decode('utf-8') return value @register_decoder_for_type(User) def dec_user_type(data): u = User() u.__dict__ = dec_dict_type(data) return u @register_decoder_for_type(struct_time) def dec_struct_time_type(data): return build_sequence(data, cast=tuple) @register_decoder_for_type(Move) def dec_move_type(data): obj = Move() obj.__dict__ = dec_dict_type(data) return obj @register_decoder_for_type(Letter) def dec_letter_type(data): obj = Letter() obj.__dict__ = dec_dict_type(data) return obj @register_decoder_for_type(ScrabbleGameInfo) def dec_sg_type(data): obj = ScrabbleGameInfo() obj.__dict__ = dec_dict_type(data) return obj @register_decoder_for_type(Player) def dec_player_type(data): obj = Player() obj.__dict__ = dec_dict_type(data) return obj @register_decoder_for_type(ServerMessage) def dec_server_message_type(data): obj = ServerMessage() obj.__dict__ = dec_dict_type(data) return obj @register_decoder_for_type(PrivateMessage) def dec_private_message_type(data): obj = PrivateMessage() obj.__dict__ = dec_dict_type(data) return obj @register_decoder_for_type(ServerBulletin) def dec_bulletin_type(data): obj = ServerBulletin() obj.__dict__ = dec_dict_type(data) return obj @register_decoder_for_type(PlayerInfo) def dec_player_info_type(data): obj = PlayerInfo() obj.__dict__ = dec_dict_type(data) return obj @register_decoder_for_type(timedelta) def dec_time_delta_type(data): days,seconds = dec_tuple_type(data) obj = timedelta(days=days, seconds=seconds) return obj @register_decoder_for_type(Time) def dec_time_type(data): obj = Time() obj.__dict__ = dec_dict_type(data) return obj @register_decoder_for_type(TimeDeltaWrapper) def dec_time_delta_wrapper_type(data): obj = TimeDeltaWrapper() obj.__dict__ = dec_dict_type(data) return obj def loads(data): """ Decode a binary string into the original Python types. """ #print "loads %s" % str(data) buffer = StringIO(data) header = buffer.read(len(HEADER)) assert header == HEADER option = buffer.read(1) #print "loads len %d" % len(data) if option == "Z": buffer = StringIO(zlib.decompress(buffer.read())) try: value = decoder[buffer.read(1)](buffer) except KeyError, e: raise DecodeError, "Type prefix not supported. (%s)" % e return value ## ##