"""
Custom serializer class to replace pickle. This does not use eval() when (de)serializing objects and thus
is safe from arbitrary code execution (hopefully)
Adapted from:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/415791
"""
from cStringIO import StringIO
from struct import pack, unpack
from types import IntType, TupleType, StringType, FloatType, LongType, ListType, DictType, NoneType, BooleanType, UnicodeType
from pyscrabble.game.player import User, Player, PlayerInfo
from pyscrabble.game.pieces import Letter, Move
from pyscrabble.game.game import ScrabbleGameInfo
from pyscrabble.lookup import ServerMessage
from pyscrabble.util import Time, TimeDeltaWrapper,ServerBulletin, PrivateMessage
from time import struct_time
from datetime import timedelta
import zlib
class EncodeError(Exception): pass
class DecodeError(Exception): pass
HEADER = "SRW3"
protocol = {
TupleType : "T",
ListType : "L",
DictType : "D",
LongType : "B",
IntType : "I",
FloatType : "F",
StringType : "S",
NoneType : "N",
BooleanType : "b",
UnicodeType : "U",
User : "u",
struct_time : "t",
Letter : "l",
Move : "m",
ScrabbleGameInfo : "g",
Player : "p",
ServerMessage : "s",
PrivateMessage : "r",
ServerBulletin : "a",
PlayerInfo : "c",
timedelta : "d",
Time : "e",
TimeDeltaWrapper : "f"
}
encoder = {}
class register_encoder_for_type(object):
"""Registers an encoder function, for a type, in the global encoder dictionary."""
def __init__(self, t):
self.type = t
def __call__(self, func):
encoder[self.type] = func
return func
#contains dictionary of decoding functions, where the dictionary key is the type prefix used.
decoder = {}
class register_decoder_for_type(object):
"""Registers a decoder function, for a prefix, in the global decoder dictionary."""
def __init__(self, t):
self.prefix = protocol[t]
def __call__(self, func):
decoder[self.prefix] = func
return func
## ##
@register_encoder_for_type(DictType)
def enc_dict_type(obj):
data = "".join([encoder[type(i)](i) for i in obj.items()])
return "%s%s%s" % ("D", pack("!L", len(data)), data)
@register_encoder_for_type(struct_time)
def enc_struct_time_type(obj):
data = "".join([encoder[type(i)](i) for i in obj])
return "%s%s%s" % (protocol[struct_time], pack("!L", len(data)), data)
@register_encoder_for_type(TupleType)
@register_encoder_for_type(ListType)
def enc_list_type(obj):
data = "".join([encoder[type(i)](i) for i in obj])
return "%s%s%s" % (protocol[type(obj)], pack("!L", len(data)), data)
@register_encoder_for_type(IntType)
def enc_int_type(obj):
return "%s%s" % (protocol[IntType], pack("!i", obj))
@register_encoder_for_type(FloatType)
def enc_float_type(obj):
return "%s%s" % (protocol[FloatType], pack("!d", obj))
@register_encoder_for_type(LongType)
def enc_long_type(obj):
obj = hex(obj)[2:-1]
return "%s%s%s" % (protocol[LongType], pack("!L", len(obj)), obj)
@register_encoder_for_type(UnicodeType)
def enc_unicode_type(obj):
obj = obj.encode('utf-8')
return "%s%s%s" % (protocol[UnicodeType], pack("!L", len(obj)), obj)
@register_encoder_for_type(StringType)
def enc_string_type(obj):
return "%s%s%s" % (protocol[StringType], pack("!L", len(obj)), obj)
@register_encoder_for_type(NoneType)
def enc_none_type(obj):
return protocol[NoneType]
@register_encoder_for_type(BooleanType)
def enc_bool_type(obj):
return protocol[BooleanType] + str(int(obj))
@register_encoder_for_type(User)
@register_encoder_for_type(Player)
@register_encoder_for_type(Move)
@register_encoder_for_type(Letter)
@register_encoder_for_type(ScrabbleGameInfo)
@register_encoder_for_type(ServerMessage)
@register_encoder_for_type(PrivateMessage)
@register_encoder_for_type(ServerBulletin)
@register_encoder_for_type(PlayerInfo)
@register_encoder_for_type(Time)
@register_encoder_for_type(TimeDeltaWrapper)
def enc_obj_type(obj):
data = "".join([encoder[type(i)](i) for i in obj.__dict__.items()])
return "%s%s%s" % (protocol[type(obj)], pack("!L", len(data)), data)
@register_encoder_for_type(timedelta)
def enc_time_delta_type(obj):
data = (obj.days, obj.seconds)
data = "".join([encoder[type(i)](i) for i in data])
return "%s%s%s" % (protocol[type(obj)], pack("!L", len(data)), data)
def dumps(obj, compress=False):
"""Encode simple Python types into a binary string."""
option = "N"
if compress: option = "Z"
try:
data = encoder[type(obj)](obj)
if compress: data = zlib.compress(data)
x = "%s%s%s" % (HEADER, option, data)
#print x
#print "dumps len %d" % len(x)
return x
except KeyError, e:
raise EncodeError, "Type not supported. (%s) (%s)" % (e, str(obj.__class__))
## ##
## ##
def build_sequence(data, cast=list):
size = unpack('!L', data.read(4))[0]
items = []
data_tell = data.tell
data_read = data.read
items_append = items.append
start_position = data.tell()
while (data_tell() - start_position) < size:
T = data_read(1)
value = decoder[T](data)
items_append(value)
return cast(items)
@register_decoder_for_type(struct_time)
@register_decoder_for_type(TupleType)
def dec_tuple_type(data):
return build_sequence(data, cast=tuple)
@register_decoder_for_type(ListType)
def dec_list_type(data):
return build_sequence(data, cast=list)
@register_decoder_for_type(DictType)
def dec_dict_type(data):
return build_sequence(data, cast=dict)
@register_decoder_for_type(LongType)
def dec_long_type(data):
size = unpack('!L', data.read(4))[0]
value = long(data.read(size),16)
return value
@register_decoder_for_type(StringType)
def dec_string_type(data):
size = unpack('!L', data.read(4))[0]
value = str(data.read(size))
return value
@register_decoder_for_type(FloatType)
def dec_float_type(data):
value = unpack('!d', data.read(8))[0]
return value
@register_decoder_for_type(IntType)
def dec_int_type(data):
value = unpack('!i', data.read(4))[0]
return value
@register_decoder_for_type(NoneType)
def dec_none_type(data):
return None
@register_decoder_for_type(BooleanType)
def dec_bool_type(data):
value = int(data.read(1))
return bool(value)
@register_decoder_for_type(UnicodeType)
def dec_unicode_type(data):
size = unpack('!L', data.read(4))[0]
value = data.read(size).decode('utf-8')
return value
@register_decoder_for_type(User)
def dec_user_type(data):
u = User()
u.__dict__ = dec_dict_type(data)
return u
@register_decoder_for_type(struct_time)
def dec_struct_time_type(data):
return build_sequence(data, cast=tuple)
@register_decoder_for_type(Move)
def dec_move_type(data):
obj = Move()
obj.__dict__ = dec_dict_type(data)
return obj
@register_decoder_for_type(Letter)
def dec_letter_type(data):
obj = Letter()
obj.__dict__ = dec_dict_type(data)
return obj
@register_decoder_for_type(ScrabbleGameInfo)
def dec_sg_type(data):
obj = ScrabbleGameInfo()
obj.__dict__ = dec_dict_type(data)
return obj
@register_decoder_for_type(Player)
def dec_player_type(data):
obj = Player()
obj.__dict__ = dec_dict_type(data)
return obj
@register_decoder_for_type(ServerMessage)
def dec_server_message_type(data):
obj = ServerMessage()
obj.__dict__ = dec_dict_type(data)
return obj
@register_decoder_for_type(PrivateMessage)
def dec_private_message_type(data):
obj = PrivateMessage()
obj.__dict__ = dec_dict_type(data)
return obj
@register_decoder_for_type(ServerBulletin)
def dec_bulletin_type(data):
obj = ServerBulletin()
obj.__dict__ = dec_dict_type(data)
return obj
@register_decoder_for_type(PlayerInfo)
def dec_player_info_type(data):
obj = PlayerInfo()
obj.__dict__ = dec_dict_type(data)
return obj
@register_decoder_for_type(timedelta)
def dec_time_delta_type(data):
days,seconds = dec_tuple_type(data)
obj = timedelta(days=days, seconds=seconds)
return obj
@register_decoder_for_type(Time)
def dec_time_type(data):
obj = Time()
obj.__dict__ = dec_dict_type(data)
return obj
@register_decoder_for_type(TimeDeltaWrapper)
def dec_time_delta_wrapper_type(data):
obj = TimeDeltaWrapper()
obj.__dict__ = dec_dict_type(data)
return obj
def loads(data):
"""
Decode a binary string into the original Python types.
"""
#print "loads %s" % str(data)
buffer = StringIO(data)
header = buffer.read(len(HEADER))
assert header == HEADER
option = buffer.read(1)
#print "loads len %d" % len(data)
if option == "Z":
buffer = StringIO(zlib.decompress(buffer.read()))
try:
value = decoder[buffer.read(1)](buffer)
except KeyError, e:
raise DecodeError, "Type prefix not supported. (%s)" % e
return value
## ##