blob: 6f2bfe343dde46c3d5a3adf6b4711c36c7cf4d86 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Copyright (c) The PyAMF Project.
# See LICENSE.txt for details.
"""
Tests for AMF0 Implementation.
@since: 0.1.0
"""
import unittest
import datetime
import pyamf
from pyamf import amf0, util, xml, python
from pyamf.tests.util import (
EncoderMixIn, DecoderMixIn, ClassCacheClearingTestCase, Spam, ClassicSpam)
class TypesTestCase(unittest.TestCase):
"""
Tests the type mappings.
"""
def test_types(self):
self.assertEqual(amf0.TYPE_NUMBER, '\x00')
self.assertEqual(amf0.TYPE_BOOL, '\x01')
self.assertEqual(amf0.TYPE_STRING, '\x02')
self.assertEqual(amf0.TYPE_OBJECT, '\x03')
self.assertEqual(amf0.TYPE_MOVIECLIP, '\x04')
self.assertEqual(amf0.TYPE_NULL, '\x05')
self.assertEqual(amf0.TYPE_UNDEFINED, '\x06')
self.assertEqual(amf0.TYPE_REFERENCE, '\x07')
self.assertEqual(amf0.TYPE_MIXEDARRAY, '\x08')
self.assertEqual(amf0.TYPE_OBJECTTERM, '\x09')
self.assertEqual(amf0.TYPE_ARRAY, '\x0a')
self.assertEqual(amf0.TYPE_DATE, '\x0b')
self.assertEqual(amf0.TYPE_LONGSTRING, '\x0c')
self.assertEqual(amf0.TYPE_UNSUPPORTED, '\x0d')
self.assertEqual(amf0.TYPE_RECORDSET, '\x0e')
self.assertEqual(amf0.TYPE_XML, '\x0f')
self.assertEqual(amf0.TYPE_TYPEDOBJECT, '\x10')
self.assertEqual(amf0.TYPE_AMF3, '\x11')
class EncoderTestCase(ClassCacheClearingTestCase, EncoderMixIn):
"""
Tests the output from the AMF0 L{Encoder<pyamf.amf0.Encoder>} class.
"""
amf_type = pyamf.AMF0
def setUp(self):
ClassCacheClearingTestCase.setUp(self)
EncoderMixIn.setUp(self)
def test_number(self):
self.assertEncoded(0, '\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEncoded(0.2, '\x00\x3f\xc9\x99\x99\x99\x99\x99\x9a')
self.assertEncoded(1, '\x00\x3f\xf0\x00\x00\x00\x00\x00\x00')
self.assertEncoded(42, '\x00\x40\x45\x00\x00\x00\x00\x00\x00')
self.assertEncoded(-123, '\x00\xc0\x5e\xc0\x00\x00\x00\x00\x00')
self.assertEncoded(1.23456789, '\x00\x3f\xf3\xc0\xca\x42\x83\xde\x1b')
def test_boolean(self):
self.assertEncoded(True, '\x01\x01')
self.assertEncoded(False, '\x01\x00')
def test_string(self):
self.assertEncoded('', '\x02\x00\x00')
self.assertEncoded('hello', '\x02\x00\x05hello')
# unicode taken from http://www.columbia.edu/kermit/utf8.html
self.assertEncoded(u'ᚠᛇᚻ', '\x02\x00\t\xe1\x9a\xa0\xe1\x9b\x87\xe1\x9a\xbb')
def test_null(self):
self.assertEncoded(None, '\x05')
def test_undefined(self):
self.assertEncoded(pyamf.Undefined, '\x06')
def test_list(self):
self.assertEncoded([], '\x0a\x00\x00\x00\x00')
self.assertEncoded([1, 2, 3],
'\x0a\x00\x00\x00\x03\x00\x3f\xf0\x00\x00\x00\x00\x00\x00\x00\x40'
'\x00\x00\x00\x00\x00\x00\x00\x00\x40\x08\x00\x00\x00\x00\x00\x00')
self.assertEncoded((1, 2, 3), '\x0a\x00\x00\x00\x03\x00\x3f\xf0\x00'
'\x00\x00\x00\x00\x00\x00\x40\x00\x00\x00\x00\x00\x00\x00\x00'
'\x40\x08\x00\x00\x00\x00\x00\x00')
def test_list_references(self):
x = []
self.assertEqual(self.encode(x, x), '\n\x00\x00\x00\x00\x07\x00\x00')
def test_longstring(self):
s = 'a' * 65537
self.assertEncoded(s, '\x0c\x00\x01\x00\x01' + s)
def test_dict(self):
self.assertEncoded({'a': 'a'}, '\x03\x00\x01a\x02\x00\x01a\x00\x00\t')
self.assertEncoded({12: True, 42: "Testing"}, '\x03', (
'\x00\x0242\x02\x00\x07Testing',
'\x00\x0212\x01\x01'
), '\x00\x00\t')
def test_mixed_array(self):
d = pyamf.MixedArray(a=1, b=2, c=3)
bytes = ('\x08\x00\x00\x00\x00', (
'\x00\x01a\x00?\xf0\x00\x00\x00\x00\x00\x00',
'\x00\x01c\x00@\x08\x00\x00\x00\x00\x00\x00',
'\x00\x01b\x00@\x00\x00\x00\x00\x00\x00\x00'
), '\x00\x00\t')
self.assertEncoded(d, bytes)
# test the reference
self.assertEqual(self.encode(d), '\x07\x00\x00')
def test_date(self):
self.assertEncoded(datetime.datetime(2005, 3, 18, 1, 58, 31),
'\x0bBp+6!\x15\x80\x00\x00\x00')
self.assertEncoded(datetime.date(2003, 12, 1),
'\x0bBo%\xe2\xb2\x80\x00\x00\x00\x00')
self.assertEncoded(datetime.datetime(2009, 3, 8, 23, 30, 47, 770122),
'\x0bBq\xfe\x86\xca5\xa1\xf4\x00\x00')
self.assertRaises(pyamf.EncodeError, self.encode, datetime.time(22, 3))
def test_xml(self):
blob = '<a><b>hello world</b></a>'
self.assertEncoded(xml.fromstring(blob),
'\x0f\x00\x00\x00\x19' + blob)
def test_xml_references(self):
blob = '<a><b>hello world</b></a>'
x = xml.fromstring(blob)
self.assertEncoded([x, x], '\n\x00\x00\x00\x02' +
('\x0f\x00\x00\x00\x19' + blob) * 2)
def test_object(self):
self.assertEncoded({'a': 'b'}, '\x03\x00\x01a\x02\x00\x01b\x00\x00\x09')
def test_force_amf3(self):
alias = pyamf.register_class(Spam, 'spam.eggs')
alias.amf3 = True
x = Spam()
x.x = 'y'
self.assertEncoded(x, '\x11\n\x0b\x13spam.eggs\x03x\x06\x03y\x01')
def test_typed_object(self):
pyamf.register_class(Spam, alias='org.pyamf.spam')
x = Spam()
x.baz = 'hello'
self.assertEncoded(x, '\x10\x00\x0eorg.pyamf.spam\x00\x03baz'
'\x02\x00\x05hello\x00\x00\t')
def test_complex_list(self):
self.assertEncoded([[1.0]], '\x0a\x00\x00\x00\x01\x0a\x00\x00\x00\x01'
'\x00\x3f\xf0\x00\x00\x00\x00\x00\x00')
self.assertEncoded([['test', 'test', 'test', 'test']],
'\x0a\x00\x00\x00\x01\x0a\x00\x00\x00\x04' + ('\x02\x00\x04test' * 4))
x = {'a': 'spam', 'b': 'eggs'}
self.assertEncoded([[x, x]], '\n\x00\x00\x00\x01\n\x00\x00\x00\x02'
'\x03\x00\x01a\x02\x00\x04spam\x00\x01b\x02\x00\x04eggs\x00\x00'
'\t\x07\x00\x02')
def test_amf3(self):
self.encoder.use_amf3 = True
o = Spam()
self.assertEncoded(o, '\x11\n\x0b\x01\x01')
def test_anonymous(self):
pyamf.register_class(Spam)
x = Spam()
x.spam = 'eggs'
x.hello = 'world'
self.assertEncoded(x, '\x03', ('\x00\x05hello\x02\x00\x05world',
'\x00\x04spam\x02\x00\x04eggs'), '\x00\x00\t')
def test_dynamic(self):
x = Spam()
x.foo = 'bar'
x.hello = 'world'
alias = pyamf.register_class(Spam)
alias.exclude_attrs = ['foo']
alias.compile()
self.assertTrue(alias.dynamic)
self.assertEncoded(x, '\x03\x00\x05hello\x02\x00\x05world\x00\x00\t')
def test_dynamic_static(self):
x = Spam()
x.foo = 'bar'
x.hello = 'world'
alias = pyamf.register_class(Spam)
alias.static_attrs = ['hello']
alias.compile()
self.assertTrue(alias.dynamic)
self.assertEncoded(x, '\x03', ('\x00\x05hello\x02\x00\x05world',
'\x00\x03foo\x02\x00\x03bar'), '\x00\x00\t')
def test_dynamic_registered(self):
x = Spam()
x.foo = 'bar'
x.hello = 'world'
alias = pyamf.register_class(Spam, 'x')
alias.exclude_attrs = ['foo']
alias.compile()
self.assertTrue(alias.dynamic)
self.assertEncoded(x, '\x10\x00\x01x', '\x00\x05hello\x02\x00\x05world',
'\x00\x00\t')
def test_custom_type(self):
def write_as_list(list_interface_obj, encoder):
list_interface_obj.ran = True
self.assertEqual(id(encoder), id(self.encoder))
return list(list_interface_obj)
class ListWrapper(object):
ran = False
def __iter__(self):
return iter([1, 2, 3])
pyamf.add_type(ListWrapper, write_as_list)
x = ListWrapper()
self.encoder.writeElement(x)
self.assertEqual(x.ran, True)
self.assertEqual(self.buf.getvalue(), '\n\x00\x00\x00\x03\x00?\xf0'
'\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00@'
'\x08\x00\x00\x00\x00\x00\x00')
def test_old_style_classes(self):
class Person:
pass
pyamf.register_class(Person, 'spam.eggs.Person')
u = Person()
u.family_name = 'Doe'
u.given_name = 'Jane'
self.encoder.writeElement(u)
self.assertEncoded(u, '\x10\x00\x10spam.eggs.Person', (
'\x00\x0bfamily_name\x02\x00\x03Doe',
'\x00\ngiven_name\x02\x00\x04Jane'
), '\x00\x00\t')
def test_slots(self):
class Person(object):
__slots__ = ('family_name', 'given_name')
u = Person()
u.family_name = 'Doe'
u.given_name = 'Jane'
self.assertEncoded(u, '\x03', (
'\x00\x0bfamily_name\x02\x00\x03Doe',
'\x00\ngiven_name\x02\x00\x04Jane'
), '\x00\x00\t')
def test_slots_registered(self):
class Person(object):
__slots__ = ('family_name', 'given_name')
u = Person()
u.family_name = 'Doe'
u.given_name = 'Jane'
pyamf.register_class(Person, 'spam.eggs.Person')
self.assertEncoded(u, '\x10\x00\x10spam.eggs.Person', (
'\x00\x0bfamily_name\x02\x00\x03Doe',
'\x00\ngiven_name\x02\x00\x04Jane'
), '\x00\x00\t')
def test_elementtree_tag(self):
"""
Pretend to look like an ElementTree object to try to fool PyAMF into
encoding an xml type.
"""
class NotAnElement(object):
items = lambda self: []
def __iter__(self):
return iter([])
foo = NotAnElement()
foo.tag = 'foo'
foo.text = 'bar'
foo.tail = None
self.assertEncoded(foo, '\x03', (
'\x00\x04text\x02\x00\x03bar',
'\x00\x04tail\x05',
'\x00\x03tag\x02\x00\x03foo',
), '\x00\x00\t')
def test_funcs(self):
def x():
pass
for i in (chr, self.assertRaises, lambda x: x, pyamf):
self.assertRaises(pyamf.EncodeError, self.encoder.writeElement, i)
def test_external_subclassed_list(self):
class L(list):
class __amf__:
external = True
def __readamf__(self, o):
pass
def __writeamf__(self, o):
pass
pyamf.register_class(L, 'a')
a = L()
a.append('foo')
a.append('bar')
self.assertEncoded(a, '\x10\x00\x01a\x00\x00\t')
def test_nonexternal_subclassed_list(self):
class L(list):
pass
pyamf.register_class(L, 'a')
a = L()
a.append('foo')
a.append('bar')
self.assertEncoded(a, '\n\x00\x00\x00\x02\x02\x00\x03foo\x02\x00\x03bar')
def test_amf3_xml(self):
self.encoder.use_amf3 = True
blob = '<root><sections><section /><section /></sections></root>'
blob = xml.tostring(xml.fromstring(blob))
bytes = self.encode(xml.fromstring(blob))
buf = util.BufferedByteStream(bytes)
self.assertEqual(buf.read_uchar(), 17)
self.assertEqual(buf.read_uchar(), 11)
self.assertEqual(buf.read_uchar() >> 1, buf.remaining())
self.assertEqual(buf.read(), blob)
def test_use_amf3(self):
self.encoder.use_amf3 = True
x = {'foo': 'bar', 'baz': 'gak'}
self.assertEncoded(x, '\x11\n\x0b', ('\x01\x07foo\x06\x07bar',
'\x07baz\x06\x07gak\x01'))
def test_static_attrs(self):
class Foo(object):
class __amf__:
static = ('foo', 'bar')
pyamf.register_class(Foo)
x = Foo()
x.foo = 'baz'
x.bar = 'gak'
self.assertEncoded(x, '\x03', ('\x00\x03bar\x02\x00\x03gak',
'\x00\x03foo\x02\x00\x03baz'), '\x00\x00\t')
def test_class(self):
class Classic:
pass
class New(object):
pass
self.assertRaises(pyamf.EncodeError, self.encoder.writeElement, Classic)
self.assertRaises(pyamf.EncodeError, self.encoder.writeElement, New)
def test_timezone(self):
d = datetime.datetime(2009, 9, 24, 14, 23, 23)
self.encoder.timezone_offset = datetime.timedelta(hours=-5)
self.assertEncoded(d, '\x0bBr>\xd8\x1f\xff\x80\x00\x00\x00')
def test_generators(self):
def foo():
yield [1, 2, 3]
yield '\xff'
yield pyamf.Undefined
self.assertEncoded(foo(), '\n\x00\x00\x00\x03\x00?\xf0\x00\x00\x00\x00'
'\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00@\x08\x00\x00\x00\x00'
'\x00\x00\x02\x00\x01\xff\x06')
def test_iterate(self):
self.assertRaises(StopIteration, self.encoder.next)
self.encoder.send('')
self.encoder.send('hello')
self.encoder.send(u'ƒøø')
self.assertEqual(self.encoder.next(), '\x02\x00\x00')
self.assertEqual(self.encoder.next(), '\x02\x00\x05hello')
self.assertEqual(self.encoder.next(), '\x02\x00\x06\xc6\x92\xc3\xb8\xc3\xb8')
self.assertRaises(StopIteration, self.encoder.next)
self.assertIdentical(iter(self.encoder), self.encoder)
self.assertEqual(self.buf.getvalue(),
'\x02\x00\x00\x02\x00\x05hello\x02\x00\x06\xc6\x92\xc3\xb8\xc3\xb8')
class DecoderTestCase(ClassCacheClearingTestCase, DecoderMixIn):
"""
Tests the output from the AMF0 L{Decoder<pyamf.amf0.Decoder>} class.
"""
amf_type = pyamf.AMF0
def setUp(self):
ClassCacheClearingTestCase.setUp(self)
DecoderMixIn.setUp(self)
def test_undefined(self):
self.assertDecoded(pyamf.Undefined, '\x06')
def test_number(self):
self.assertDecoded(0, '\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertDecoded(0.2, '\x00\x3f\xc9\x99\x99\x99\x99\x99\x9a')
self.assertDecoded(1, '\x00\x3f\xf0\x00\x00\x00\x00\x00\x00')
self.assertDecoded(42, '\x00\x40\x45\x00\x00\x00\x00\x00\x00')
self.assertDecoded(-123, '\x00\xc0\x5e\xc0\x00\x00\x00\x00\x00')
self.assertDecoded(1.23456789, '\x00\x3f\xf3\xc0\xca\x42\x83\xde\x1b')
def test_number_types(self):
nr_types = [
('\x00\x00\x00\x00\x00\x00\x00\x00\x00', int),
('\x00\x3f\xc9\x99\x99\x99\x99\x99\x9a', float),
('\x00\x3f\xf0\x00\x00\x00\x00\x00\x00', int),
('\x00\x40\x45\x00\x00\x00\x00\x00\x00', int),
('\x00\xc0\x5e\xc0\x00\x00\x00\x00\x00', int),
('\x00\x3f\xf3\xc0\xca\x42\x83\xde\x1b', float),
('\x00\xff\xf8\x00\x00\x00\x00\x00\x00', float), # nan
('\x00\xff\xf0\x00\x00\x00\x00\x00\x00', float), # -inf
('\x00\x7f\xf0\x00\x00\x00\x00\x00\x00', float), # inf
]
for t in nr_types:
bytes, expected_type = t
self.buf.truncate()
self.buf.write(bytes)
self.buf.seek(0)
self.assertEqual(type(self.decoder.readElement()), expected_type)
def test_infinites(self):
self.buf.truncate()
self.buf.write('\x00\xff\xf8\x00\x00\x00\x00\x00\x00')
self.buf.seek(0)
x = self.decoder.readElement()
self.assertTrue(python.isNaN(x))
self.buf.truncate()
self.buf.write('\x00\xff\xf0\x00\x00\x00\x00\x00\x00')
self.buf.seek(0)
x = self.decoder.readElement()
self.assertTrue(python.isNegInf(x))
self.buf.truncate()
self.buf.write('\x00\x7f\xf0\x00\x00\x00\x00\x00\x00')
self.buf.seek(0)
x = self.decoder.readElement()
self.assertTrue(python.isPosInf(x))
def test_boolean(self):
self.assertDecoded(True, '\x01\x01')
self.assertDecoded(False, '\x01\x00')
def test_string(self):
self.assertDecoded('', '\x02\x00\x00')
self.assertDecoded('hello', '\x02\x00\x05hello')
self.assertDecoded(u'ᚠᛇᚻ',
'\x02\x00\t\xe1\x9a\xa0\xe1\x9b\x87\xe1\x9a\xbb')
def test_longstring(self):
a = 'a' * 65537
self.assertDecoded(a, '\x0c\x00\x01\x00\x01' + a)
def test_null(self):
self.assertDecoded(None, '\x05')
def test_list(self):
self.assertDecoded([], '\x0a\x00\x00\x00\x00')
self.assertDecoded([1, 2, 3], '\x0a\x00\x00\x00\x03\x00\x3f\xf0\x00'
'\x00\x00\x00\x00\x00\x00\x40\x00\x00\x00\x00\x00\x00\x00\x00\x40'
'\x08\x00\x00\x00\x00\x00\x00')
def test_dict(self):
bytes = '\x08\x00\x00\x00\x00\x00\x01\x61\x02\x00\x01\x61\x00\x00\x09'
self.assertDecoded({'a': 'a'}, bytes)
self.buf.write(bytes)
self.buf.seek(0)
d = self.decoder.readElement()
def test_mixed_array(self):
bytes = ('\x08\x00\x00\x00\x00\x00\x01a\x00?\xf0\x00\x00\x00\x00\x00'
'\x00\x00\x01c\x00@\x08\x00\x00\x00\x00\x00\x00\x00\x01b\x00@\x00'
'\x00\x00\x00\x00\x00\x00\x00\x00\t')
self.assertDecoded(pyamf.MixedArray(a=1, b=2, c=3), bytes)
self.buf.write(bytes)
self.buf.seek(0)
d = self.decoder.readElement()
def test_date(self):
self.assertDecoded(datetime.datetime(2005, 3, 18, 1, 58, 31),
'\x0bBp+6!\x15\x80\x00\x00\x00')
self.assertDecoded(datetime.datetime(2009, 3, 8, 23, 30, 47, 770122),
'\x0bBq\xfe\x86\xca5\xa1\xf4\x00\x00')
def test_xml(self):
e = '<a><b>hello world</b></a>'
ret = self.decode('\x0f\x00\x00\x00\x19' + e)
self.assertEqual(xml.tostring(ret), e)
def test_xml_references(self):
self.buf.truncate(0)
self.buf.write('\x0f\x00\x00\x00\x19<a><b>hello world</b></a>'
'\x07\x00\x00')
self.buf.seek(0)
self.assertEqual(
xml.tostring(xml.fromstring('<a><b>hello world</b></a>')),
xml.tostring(self.decoder.readElement()))
self.assertEqual(
xml.tostring(xml.fromstring('<a><b>hello world</b></a>')),
xml.tostring(self.decoder.readElement()))
def test_object(self):
bytes = '\x03\x00\x01a\x02\x00\x01b\x00\x00\x09'
self.assertDecoded({'a': 'b'}, bytes)
self.buf.write(bytes)
self.buf.seek(0)
d = self.decoder.readElement()
def test_registered_class(self):
pyamf.register_class(Spam, alias='org.pyamf.spam')
bytes = ('\x10\x00\x0eorg.pyamf.spam\x00\x03baz'
'\x02\x00\x05hello\x00\x00\x09')
obj = self.decode(bytes)
self.assertEqual(type(obj), Spam)
self.assertTrue(hasattr(obj, 'baz'))
self.assertEqual(obj.baz, 'hello')
def test_complex_list(self):
x = datetime.datetime(2007, 11, 3, 8, 7, 37, 437000)
self.assertDecoded([['test','test','test','test']],
'\x0A\x00\x00\x00\x01\x0A\x00\x00\x00\x04\x02\x00\x04\x74\x65\x73'
'\x74\x02\x00\x04\x74\x65\x73\x74\x02\x00\x04\x74\x65\x73\x74\x02'
'\x00\x04\x74\x65\x73\x74')
self.assertDecoded([x], '\x0a\x00\x00\x00\x01\x0b\x42\x71\x60\x48\xcf'
'\xed\xd0\x00\x00\x00')
self.assertDecoded(
[[{u'a': u'spam', u'b': u'eggs'}, {u'a': u'spam', u'b': u'eggs'}]],
'\n\x00\x00\x00\x01\n\x00\x00\x00\x02\x08\x00\x00\x00\x00\x00\x01'
'a\x02\x00\x04spam\x00\x01b\x02\x00\x04eggs\x00\x00\t\x07\x00\x02')
self.assertDecoded([[1.0]], '\x0A\x00\x00\x00\x01\x0A\x00\x00\x00\x01'
'\x00\x3F\xF0\x00\x00\x00\x00\x00\x00')
def test_amf3(self):
self.buf.write('\x11\x04\x01')
self.buf.seek(0)
self.assertEqual(self.decoder.readElement(), 1)
def test_dynamic(self):
class Foo(pyamf.ASObject):
pass
x = Foo()
x.foo = 'bar'
alias = pyamf.register_class(Foo, 'x')
alias.exclude_attrs = ['hello']
self.assertDecoded(x, '\x10\x00\x01x\x00\x03foo\x02\x00\x03bar\x00'
'\x05hello\x02\x00\x05world\x00\x00\t')
def test_classic_class(self):
pyamf.register_class(ClassicSpam, 'spam.eggs')
self.buf.write('\x10\x00\tspam.eggs\x00\x03foo\x02\x00\x03bar\x00\x00\t')
self.buf.seek(0)
foo = self.decoder.readElement()
self.assertEqual(foo.foo, 'bar')
def test_not_strict(self):
self.assertFalse(self.decoder.strict)
# write a typed object to the stream
self.buf.write('\x10\x00\tspam.eggs\x00\x03foo\x02\x00\x03bar\x00\x00\t')
self.buf.seek(0)
self.assertFalse('spam.eggs' in pyamf.CLASS_CACHE)
obj = self.decoder.readElement()
self.assertTrue(isinstance(obj, pyamf.TypedObject))
self.assertEqual(obj.alias, 'spam.eggs')
self.assertEqual(obj, {'foo': 'bar'})
def test_strict(self):
self.decoder.strict = True
self.assertTrue(self.decoder.strict)
# write a typed object to the stream
self.buf.write('\x10\x00\tspam.eggs\x00\x03foo\x02\x00\x03bar\x00\x00\t')
self.buf.seek(0)
self.assertFalse('spam.eggs' in pyamf.CLASS_CACHE)
self.assertRaises(pyamf.UnknownClassAlias, self.decoder.readElement)
def test_slots(self):
class Person(object):
__slots__ = ('family_name', 'given_name')
self.buf.write('\x03\x00\x0bfamily_name\x02\x00\x03Doe\x00\n'
'given_name\x02\x00\x04Jane\x00\x00\t')
self.buf.seek(0)
foo = self.decoder.readElement()
self.assertEqual(foo.family_name, 'Doe')
self.assertEqual(foo.given_name, 'Jane')
def test_slots_registered(self):
class Person(object):
__slots__ = ('family_name', 'given_name')
pyamf.register_class(Person, 'spam.eggs.Person')
self.buf.write('\x10\x00\x10spam.eggs.Person\x00\x0bfamily_name\x02'
'\x00\x03Doe\x00\ngiven_name\x02\x00\x04Jane\x00\x00\t')
self.buf.seek(0)
foo = self.decoder.readElement()
self.assertTrue(isinstance(foo, Person))
self.assertEqual(foo.family_name, 'Doe')
self.assertEqual(foo.given_name, 'Jane')
def test_ioerror_buffer_position(self):
"""
Test to ensure that if an IOError is raised by `readElement` that
the original position of the stream is restored.
"""
bytes = pyamf.encode(u'foo', [1, 2, 3], encoding=pyamf.AMF0).getvalue()
self.buf.write(bytes[:-1])
self.buf.seek(0)
self.decoder.readElement()
self.assertEqual(self.buf.tell(), 6)
self.assertRaises(IOError, self.decoder.readElement)
self.assertEqual(self.buf.tell(), 6)
def test_timezone(self):
self.decoder.timezone_offset = datetime.timedelta(hours=-5)
self.buf.write('\x0bBr>\xc6\xf5w\x80\x00\x00\x00')
self.buf.seek(0)
f = self.decoder.readElement()
self.assertEqual(f, datetime.datetime(2009, 9, 24, 9, 23, 23))
def test_unsupported(self):
self.assertDecoded(None, '\x0D')
def test_bad_reference(self):
self.assertRaises(pyamf.ReferenceError, self.decode, '\x07\x00\x03')
def test_iterate(self):
self.assertRaises(StopIteration, self.decoder.next)
self.decoder.send('\x02\x00\x00')
self.decoder.send('\x02\x00\x05hello')
self.decoder.send('\x02\x00\t\xe1\x9a\xa0\xe1\x9b\x87\xe1\x9a\xbb')
self.assertEqual(self.decoder.next(), '')
self.assertEqual(self.decoder.next(), 'hello')
self.assertEqual(self.decoder.next(), u'\u16a0\u16c7\u16bb')
self.assertRaises(StopIteration, self.decoder.next)
self.assertIdentical(iter(self.decoder), self.decoder)
def test_bad_type(self):
self.assertRaises(pyamf.DecodeError, self.decode, '\xff')
def test_kwargs(self):
"""
Python <= 3 demand that kwargs keys be bytes instead of unicode/string.
"""
def f(**kwargs):
self.assertEqual(kwargs, {'a': 'a'})
kwargs = self.decode('\x03\x00\x01a\x02\x00\x01a\x00\x00\t')
f(**kwargs)
class RecordSetTestCase(unittest.TestCase, EncoderMixIn, DecoderMixIn):
"""
Tests for L{amf0.RecordSet}
"""
amf_type = pyamf.AMF0
blob = (
'\x10\x00\tRecordSet\x00\nserverInfo\x03', (
'\x00\x06cursor\x00?\xf0\x00\x00\x00\x00\x00\x00',
'\x00\x0bcolumnNames\n\x00\x00\x00\x03\x02\x00\x01a\x02\x00\x01b\x02\x00\x01c',
'\x00\x0binitialData\n\x00\x00\x00\x03\n\x00\x00\x00\x03\x00?\xf0'
'\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00'
'@\x08\x00\x00\x00\x00\x00\x00\n\x00\x00\x00\x03\x00@\x10\x00'
'\x00\x00\x00\x00\x00\x00@\x14\x00\x00\x00\x00\x00\x00\x00@\x18'
'\x00\x00\x00\x00\x00\x00\n\x00\x00\x00\x03\x00@\x1c\x00\x00'
'\x00\x00\x00\x00\x00@ \x00\x00\x00\x00\x00\x00\x00@"\x00\x00'
'\x00\x00\x00\x00',
'\x00\x07version\x00?\xf0\x00\x00\x00\x00\x00\x00',
'\x00\ntotalCount\x00@\x08\x00\x00\x00\x00\x00\x00'),
'\x00\x00\t\x00\x00\t')
def setUp(self):
unittest.TestCase.setUp(self)
EncoderMixIn.setUp(self)
DecoderMixIn.setUp(self)
def test_create(self):
x = amf0.RecordSet()
self.assertEqual(x.columns, [])
self.assertEqual(x.items, [])
self.assertEqual(x.service, None)
self.assertEqual(x.id, None)
x = amf0.RecordSet(columns=['spam', 'eggs'], items=[[1, 2]])
self.assertEqual(x.columns, ['spam', 'eggs'])
self.assertEqual(x.items, [[1, 2]])
self.assertEqual(x.service, None)
self.assertEqual(x.id, None)
x = amf0.RecordSet(service={}, id=54)
self.assertEqual(x.columns, [])
self.assertEqual(x.items, [])
self.assertEqual(x.service, {})
self.assertEqual(x.id, 54)
def test_server_info(self):
# empty recordset
x = amf0.RecordSet()
si = x.serverInfo
self.assertTrue(isinstance(si, dict))
self.assertEqual(si.cursor, 1)
self.assertEqual(si.version, 1)
self.assertEqual(si.columnNames, [])
self.assertEqual(si.initialData, [])
self.assertEqual(si.totalCount, 0)
try:
si.serviceName
except AttributeError:
pass
try:
si.id
except AttributeError:
pass
# basic create
x = amf0.RecordSet(columns=['a', 'b', 'c'], items=[
[1, 2, 3], [4, 5, 6], [7, 8, 9]])
si = x.serverInfo
self.assertTrue(isinstance(si, dict))
self.assertEqual(si.cursor, 1)
self.assertEqual(si.version, 1)
self.assertEqual(si.columnNames, ['a', 'b', 'c'])
self.assertEqual(si.initialData, [[1, 2, 3], [4, 5, 6], [7, 8, 9]])
self.assertEqual(si.totalCount, 3)
try:
si.serviceName
except AttributeError:
pass
try:
si.id
except AttributeError:
pass
# with service & id
service = {'name': 'baz'}
x = amf0.RecordSet(columns=['spam'], items=[['eggs']],
service=service, id='asdfasdf')
si = x.serverInfo
self.assertTrue(isinstance(si, dict))
self.assertEqual(si.cursor, 1)
self.assertEqual(si.version, 1)
self.assertEqual(si.columnNames, ['spam'])
self.assertEqual(si.initialData, [['eggs']])
self.assertEqual(si.totalCount, 1)
self.assertEqual(si.serviceName, 'baz')
self.assertEqual(si.id, 'asdfasdf')
def test_encode(self):
self.buf = self.encoder.stream
x = amf0.RecordSet(columns=['a', 'b', 'c'], items=[
[1, 2, 3], [4, 5, 6], [7, 8, 9]])
self.assertEncoded(x, self.blob)
def test_decode(self):
self.buf = self.decoder.stream
x = self.decode(self.blob)
self.assertTrue(isinstance(x, amf0.RecordSet))
self.assertEqual(x.columns, ['a', 'b', 'c'])
self.assertEqual(x.items, [[1, 2, 3], [4, 5, 6], [7, 8, 9]])
self.assertEqual(x.service, None)
self.assertEqual(x.id, None)
def test_repr(self):
x = amf0.RecordSet(columns=['spam'], items=[['eggs']],
service={'name': 'baz'}, id='asdfasdf')
self.assertEqual(repr(x), "<pyamf.amf0.RecordSet id=asdfasdf "
"service={'name': 'baz'} at 0x%x>" % (id(x),))
class ClassInheritanceTestCase(ClassCacheClearingTestCase, EncoderMixIn):
amf_type = pyamf.AMF0
def setUp(self):
# wtf
ClassCacheClearingTestCase.setUp(self)
EncoderMixIn.setUp(self)
def test_simple(self):
class A(object):
class __amf__:
static = ('a')
class B(A):
class __amf__:
static = ('b')
pyamf.register_class(A, 'A')
pyamf.register_class(B, 'B')
x = B()
x.a = 'spam'
x.b = 'eggs'
self.assertEncoded(x, '\x10\x00\x01B', ('\x00\x01a\x02\x00\x04spam',
'\x00\x01b\x02\x00\x04eggs'), '\x00\x00\t')
def test_deep(self):
class A(object):
class __amf__:
static = ('a')
class B(A):
class __amf__:
static = ('b')
class C(B):
class __amf__:
static = ('c')
pyamf.register_class(A, 'A')
pyamf.register_class(B, 'B')
pyamf.register_class(C, 'C')
x = C()
x.a = 'spam'
x.b = 'eggs'
x.c = 'foo'
self.assertEncoded(x, '\x10\x00\x01C', ('\x00\x01a\x02\x00\x04spam',
'\x00\x01c\x02\x00\x03foo', '\x00\x01b\x02\x00\x04eggs'),
'\x00\x00\t')
class ExceptionEncodingTestCase(ClassCacheClearingTestCase):
"""
Tests for encoding exceptions.
"""
def setUp(self):
ClassCacheClearingTestCase.setUp(self)
self.buffer = util.BufferedByteStream()
self.encoder = amf0.Encoder(self.buffer)
def test_exception(self):
try:
raise Exception('foo bar')
except Exception, e:
self.encoder.writeElement(e)
self.assertEqual(self.buffer.getvalue(), '\x03\x00\x07message\x02'
'\x00\x07foo bar\x00\x04name\x02\x00\tException\x00\x00\t')
def test_user_defined(self):
class FooBar(Exception):
pass
try:
raise FooBar('foo bar')
except Exception, e:
self.encoder.writeElement(e)
self.assertEqual(self.buffer.getvalue(), '\x03\x00\x07message\x02'
'\x00\x07foo bar\x00\x04name\x02\x00\x06FooBar\x00\x00\t')
def test_typed(self):
class XYZ(Exception):
pass
pyamf.register_class(XYZ, 'foo.bar')
try:
raise XYZ('blarg')
except Exception, e:
self.encoder.writeElement(e)
self.assertEqual(self.buffer.getvalue(), '\x10\x00\x07foo.bar\x00'
'\x07message\x02\x00\x05blarg\x00\x04name\x02\x00\x03XYZ\x00\x00\t')
class AMF0ContextTestCase(unittest.TestCase):
"""
"""
bytes = ('\x00\x03\x00\x02\x00\x0eServiceLicense\x00\x00\x00\x00O\x11\n\x0b'
'\x01-serviceConfigurationId\x06\t1234\x15licenseKey\x06Axxxxxxxxxxxxxx'
'xxxxxxxxxxxxxxxxxx\x01\x00\tSessionId\x00\x00\x00\x00\xb2\x11\n\x0b'
'\x01\x0bToken\x06\x82Iyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy'
'yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy'
'yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy\x01\x00\x01\x00\x0cRegi'
'sterUser\x00\x02/3\x00\x00\x01k\n\x00\x00\x00\x07\x11\n#\x01\rformat'
'\x0bvalue\x069urn:TribalDDB:identity:email\x06!tester@trial.com\x11\n'
'#\x01\x02\ttype\x06\x0fpasswrd\x06Kurn:TribalDDB:authentication:passwo'
'rd\x11\nS\x01\x19EmailAddress\x15PostalCode\x17DateOfBirth\x11LastName'
'\x13FirstName\x06\x06\x06\x0b12345\n3\x12\x0bmonth\x07day\tyear\x04'
'\x04\x04\x0f\x04\x8fF\x06\rewrwer\x06\x07wer\x11\n3\x1fSectionTracking'
'\tCsId\x11TrtmntId\x13LocalCsId\x04\x00\x04\x86\x94z\x04\x00\x11\n'
'\x13\x11Tracking\x07CTC\x06\x07555\x11\t\x03\x01\n#\x13UserOptIn\x1dli'
'veModeEnable\x05id\x02\x04\x884\x02\x00\x10wwwwwwwwwwwwwwww')
def test_decode(self):
from pyamf.remoting import decode
e = decode(self.bytes)
a, b, c, d, e, f, g = e['/3'].body
self.assertEqual(a, {'value': u'tester@trial.com',
'format': u'urn:TribalDDB:identity:email'})
self.assertEqual(b, {'type': u'urn:TribalDDB:authentication:password',
'value': u'passwrd'})
self.assertEqual(c, {'PostalCode': u'12345',
'DateOfBirth': {'month': 4, 'day': 15, 'year': 1990},
'EmailAddress': u'tester@trial.com',
'FirstName': u'wer',
'LastName': u'ewrwer'})
self.assertEqual(d, {'CsId': 0, 'TrtmntId': 100986, 'LocalCsId': 0})
self.assertEqual(e, {'CTC': u'555'})
self.assertEqual(f, [{'liveModeEnable': False, 'id': 1076}])
self.assertEqual(g, u'wwwwwwwwwwwwwwww')