blob: 100435bcbc5c0b384295afb5df4abbec25f2e160 [file] [log] [blame]
# Copyright (c) The PyAMF Project.
# See LICENSE.txt for details.
"""
Flex Messaging implementation.
This module contains the message classes used with Flex Data Services.
@see: U{RemoteObject on OSFlash (external)
<http://osflash.org/documentation/amf3#remoteobject>}
@since: 0.1
"""
import uuid
import pyamf.util
from pyamf import amf3
__all__ = [
'RemotingMessage',
'CommandMessage',
'AcknowledgeMessage',
'ErrorMessage',
'AbstractMessage',
'AsyncMessage'
]
NAMESPACE = 'flex.messaging.messages'
SMALL_FLAG_MORE = 0x80
class AbstractMessage(object):
"""
Abstract base class for all Flex messages.
Messages have two customizable sections; headers and data. The headers
property provides access to specialized meta information for a specific
message instance. The data property contains the instance specific data
that needs to be delivered and processed by the decoder.
@see: U{AbstractMessage on Livedocs<http://
livedocs.adobe.com/flex/201/langref/mx/messaging/messages/AbstractMessage.html>}
@ivar body: Specific data that needs to be delivered to the remote
destination.
@type body: C{mixed}
@ivar clientId: Indicates which client sent the message.
@type clientId: C{str}
@ivar destination: Message destination.
@type destination: C{str}
@ivar headers: Message headers. Core header names start with DS.
@type headers: C{dict}
@ivar messageId: Unique Message ID.
@type messageId: C{str}
@ivar timeToLive: How long the message should be considered valid and
deliverable.
@type timeToLive: C{int}
@ivar timestamp: Timestamp when the message was generated.
@type timestamp: C{int}
"""
class __amf__:
amf3 = True
static = ('body', 'clientId', 'destination', 'headers', 'messageId',
'timestamp', 'timeToLive')
#: Each message pushed from the server will contain this header identifying
#: the client that will receive the message.
DESTINATION_CLIENT_ID_HEADER = "DSDstClientId"
#: Messages are tagged with the endpoint id for the channel they are sent
#: over.
ENDPOINT_HEADER = "DSEndpoint"
#: Messages that need to set remote credentials for a destination carry the
#: C{Base64} encoded credentials in this header.
REMOTE_CREDENTIALS_HEADER = "DSRemoteCredentials"
#: The request timeout value is set on outbound messages by services or
#: channels and the value controls how long the responder will wait for an
#: acknowledgement, result or fault response for the message before timing
#: out the request.
REQUEST_TIMEOUT_HEADER = "DSRequestTimeout"
SMALL_ATTRIBUTE_FLAGS = [0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40]
SMALL_ATTRIBUTES = dict(zip(
SMALL_ATTRIBUTE_FLAGS,
__amf__.static
))
SMALL_UUID_FLAGS = [0x01, 0x02]
SMALL_UUIDS = dict(zip(
SMALL_UUID_FLAGS,
['clientId', 'messageId']
))
def __new__(cls, *args, **kwargs):
obj = object.__new__(cls)
obj.__init__(*args, **kwargs)
return obj
def __init__(self, *args, **kwargs):
self.body = kwargs.get('body', None)
self.clientId = kwargs.get('clientId', None)
self.destination = kwargs.get('destination', None)
self.headers = kwargs.get('headers', {})
self.messageId = kwargs.get('messageId', None)
self.timestamp = kwargs.get('timestamp', None)
self.timeToLive = kwargs.get('timeToLive', None)
def __repr__(self):
m = '<%s ' % self.__class__.__name__
for k in self.__dict__:
m += ' %s=%r' % (k, getattr(self, k))
return m + " />"
def decodeSmallAttribute(self, attr, input):
"""
@since: 0.5
"""
obj = input.readObject()
if attr in ['timestamp', 'timeToLive']:
return pyamf.util.get_datetime(obj / 1000.0)
return obj
def encodeSmallAttribute(self, attr):
"""
@since: 0.5
"""
obj = getattr(self, attr)
if not obj:
return obj
if attr in ['timestamp', 'timeToLive']:
return pyamf.util.get_timestamp(obj) * 1000.0
elif attr in ['clientId', 'messageId']:
if isinstance(obj, uuid.UUID):
return None
return obj
def __readamf__(self, input):
flags = read_flags(input)
if len(flags) > 2:
raise pyamf.DecodeError('Expected <=2 (got %d) flags for the '
'AbstractMessage portion of the small message for %r' % (
len(flags), self.__class__))
for index, byte in enumerate(flags):
if index == 0:
for flag in self.SMALL_ATTRIBUTE_FLAGS:
if flag & byte:
attr = self.SMALL_ATTRIBUTES[flag]
setattr(self, attr, self.decodeSmallAttribute(attr, input))
elif index == 1:
for flag in self.SMALL_UUID_FLAGS:
if flag & byte:
attr = self.SMALL_UUIDS[flag]
setattr(self, attr, decode_uuid(input.readObject()))
def __writeamf__(self, output):
flag_attrs = []
uuid_attrs = []
byte = 0
for flag in self.SMALL_ATTRIBUTE_FLAGS:
value = self.encodeSmallAttribute(self.SMALL_ATTRIBUTES[flag])
if value:
byte |= flag
flag_attrs.append(value)
flags = byte
byte = 0
for flag in self.SMALL_UUID_FLAGS:
attr = self.SMALL_UUIDS[flag]
value = getattr(self, attr)
if not value:
continue
byte |= flag
uuid_attrs.append(amf3.ByteArray(value.bytes))
if not byte:
output.writeUnsignedByte(flags)
else:
output.writeUnsignedByte(flags | SMALL_FLAG_MORE)
output.writeUnsignedByte(byte)
[output.writeObject(attr) for attr in flag_attrs]
[output.writeObject(attr) for attr in uuid_attrs]
def getSmallMessage(self):
"""
Return a ISmallMessage representation of this object. If one is not
available, L{NotImplementedError} will be raised.
@since: 0.5
"""
raise NotImplementedError
class AsyncMessage(AbstractMessage):
"""
I am the base class for all asynchronous Flex messages.
@see: U{AsyncMessage on Livedocs<http://
livedocs.adobe.com/flex/201/langref/mx/messaging/messages/AsyncMessage.html>}
@ivar correlationId: Correlation id of the message.
@type correlationId: C{str}
"""
#: Messages that were sent with a defined subtopic property indicate their
#: target subtopic in this header.
SUBTOPIC_HEADER = "DSSubtopic"
class __amf__:
static = ('correlationId',)
def __init__(self, *args, **kwargs):
AbstractMessage.__init__(self, *args, **kwargs)
self.correlationId = kwargs.get('correlationId', None)
def __readamf__(self, input):
AbstractMessage.__readamf__(self, input)
flags = read_flags(input)
if len(flags) > 1:
raise pyamf.DecodeError('Expected <=1 (got %d) flags for the '
'AsyncMessage portion of the small message for %r' % (
len(flags), self.__class__))
byte = flags[0]
if byte & 0x01:
self.correlationId = input.readObject()
if byte & 0x02:
self.correlationId = decode_uuid(input.readObject())
def __writeamf__(self, output):
AbstractMessage.__writeamf__(self, output)
if not isinstance(self.correlationId, uuid.UUID):
output.writeUnsignedByte(0x01)
output.writeObject(self.correlationId)
else:
output.writeUnsignedByte(0x02)
output.writeObject(pyamf.amf3.ByteArray(self.correlationId.bytes))
def getSmallMessage(self):
"""
Return a ISmallMessage representation of this async message.
@since: 0.5
"""
return AsyncMessageExt(**self.__dict__)
class AcknowledgeMessage(AsyncMessage):
"""
I acknowledge the receipt of a message that was sent previously.
Every message sent within the messaging system must receive an
acknowledgement.
@see: U{AcknowledgeMessage on Livedocs<http://
livedocs.adobe.com/flex/201/langref/mx/messaging/messages/AcknowledgeMessage.html>}
"""
#: Used to indicate that the acknowledgement is for a message that
#: generated an error.
ERROR_HINT_HEADER = "DSErrorHint"
def __readamf__(self, input):
AsyncMessage.__readamf__(self, input)
flags = read_flags(input)
if len(flags) > 1:
raise pyamf.DecodeError('Expected <=1 (got %d) flags for the '
'AcknowledgeMessage portion of the small message for %r' % (
len(flags), self.__class__))
def __writeamf__(self, output):
AsyncMessage.__writeamf__(self, output)
output.writeUnsignedByte(0)
def getSmallMessage(self):
"""
Return a ISmallMessage representation of this acknowledge message.
@since: 0.5
"""
return AcknowledgeMessageExt(**self.__dict__)
class CommandMessage(AsyncMessage):
"""
Provides a mechanism for sending commands related to publish/subscribe
messaging, ping, and cluster operations.
@see: U{CommandMessage on Livedocs<http://
livedocs.adobe.com/flex/201/langref/mx/messaging/messages/CommandMessage.html>}
@ivar operation: The command
@type operation: C{int}
@ivar messageRefType: hmm, not sure about this one.
@type messageRefType: C{str}
"""
#: The server message type for authentication commands.
AUTHENTICATION_MESSAGE_REF_TYPE = "flex.messaging.messages.AuthenticationMessage"
#: This is used to test connectivity over the current channel to the remote
#: endpoint.
PING_OPERATION = 5
#: This is used by a remote destination to sync missed or cached messages
#: back to a client as a result of a client issued poll command.
SYNC_OPERATION = 4
#: This is used to request a list of failover endpoint URIs for the remote
#: destination based on cluster membership.
CLUSTER_REQUEST_OPERATION = 7
#: This is used to send credentials to the endpoint so that the user can be
#: logged in over the current channel. The credentials need to be C{Base64}
#: encoded and stored in the body of the message.
LOGIN_OPERATION = 8
#: This is used to log the user out of the current channel, and will
#: invalidate the server session if the channel is HTTP based.
LOGOUT_OPERATION = 9
#: This is used to poll a remote destination for pending, undelivered
#: messages.
POLL_OPERATION = 2
#: Subscribe commands issued by a consumer pass the consumer's C{selector}
#: expression in this header.
SELECTOR_HEADER = "DSSelector"
#: This is used to indicate that the client's session with a remote
#: destination has timed out.
SESSION_INVALIDATE_OPERATION = 10
#: This is used to subscribe to a remote destination.
SUBSCRIBE_OPERATION = 0
#: This is the default operation for new L{CommandMessage} instances.
UNKNOWN_OPERATION = 1000
#: This is used to unsubscribe from a remote destination.
UNSUBSCRIBE_OPERATION = 1
#: This operation is used to indicate that a channel has disconnected.
DISCONNECT_OPERATION = 12
class __amf__:
static = ('operation',)
def __init__(self, *args, **kwargs):
AsyncMessage.__init__(self, *args, **kwargs)
self.operation = kwargs.get('operation', None)
def __readamf__(self, input):
AsyncMessage.__readamf__(self, input)
flags = read_flags(input)
if not flags:
return
if len(flags) > 1:
raise pyamf.DecodeError('Expected <=1 (got %d) flags for the '
'CommandMessage portion of the small message for %r' % (
len(flags), self.__class__))
byte = flags[0]
if byte & 0x01:
self.operation = input.readObject()
def __writeamf__(self, output):
AsyncMessage.__writeamf__(self, output)
if self.operation:
output.writeUnsignedByte(0x01)
output.writeObject(self.operation)
else:
output.writeUnsignedByte(0)
def getSmallMessage(self):
"""
Return a ISmallMessage representation of this command message.
@since: 0.5
"""
return CommandMessageExt(**self.__dict__)
class ErrorMessage(AcknowledgeMessage):
"""
I am the Flex error message to be returned to the client.
This class is used to report errors within the messaging system.
@see: U{ErrorMessage on Livedocs<http://
livedocs.adobe.com/flex/201/langref/mx/messaging/messages/ErrorMessage.html>}
"""
#: If a message may not have been delivered, the faultCode will contain
#: this constant.
MESSAGE_DELIVERY_IN_DOUBT = "Client.Error.DeliveryInDoubt"
#: Header name for the retryable hint header.
#:
#: This is used to indicate that the operation that generated the error may
#: be retryable rather than fatal.
RETRYABLE_HINT_HEADER = "DSRetryableErrorHint"
class __amf__:
static = ('extendedData', 'faultCode', 'faultDetail', 'faultString',
'rootCause')
def __init__(self, *args, **kwargs):
AcknowledgeMessage.__init__(self, *args, **kwargs)
#: Extended data that the remote destination has chosen to associate
#: with this error to facilitate custom error processing on the client.
self.extendedData = kwargs.get('extendedData', {})
#: Fault code for the error.
self.faultCode = kwargs.get('faultCode', None)
#: Detailed description of what caused the error.
self.faultDetail = kwargs.get('faultDetail', None)
#: A simple description of the error.
self.faultString = kwargs.get('faultString', None)
#: Should a traceback exist for the error, this property contains the
#: message.
self.rootCause = kwargs.get('rootCause', {})
def getSmallMessage(self):
"""
Return a ISmallMessage representation of this error message.
@since: 0.5
"""
raise NotImplementedError
class RemotingMessage(AbstractMessage):
"""
I am used to send RPC requests to a remote endpoint.
@see: U{RemotingMessage on Livedocs<http://
livedocs.adobe.com/flex/201/langref/mx/messaging/messages/RemotingMessage.html>}
"""
class __amf__:
static = ('operation', 'source')
def __init__(self, *args, **kwargs):
AbstractMessage.__init__(self, *args, **kwargs)
#: Name of the remote method/operation that should be called.
self.operation = kwargs.get('operation', None)
#: Name of the service to be called including package name.
#: This property is provided for backwards compatibility.
self.source = kwargs.get('source', None)
class AcknowledgeMessageExt(AcknowledgeMessage):
"""
An L{AcknowledgeMessage}, but implementing C{ISmallMessage}.
@since: 0.5
"""
class __amf__:
external = True
class CommandMessageExt(CommandMessage):
"""
A L{CommandMessage}, but implementing C{ISmallMessage}.
@since: 0.5
"""
class __amf__:
external = True
class AsyncMessageExt(AsyncMessage):
"""
A L{AsyncMessage}, but implementing C{ISmallMessage}.
@since: 0.5
"""
class __amf__:
external = True
def read_flags(input):
"""
@since: 0.5
"""
flags = []
done = False
while not done:
byte = input.readUnsignedByte()
if not byte & SMALL_FLAG_MORE:
done = True
else:
byte = byte ^ SMALL_FLAG_MORE
flags.append(byte)
return flags
def decode_uuid(obj):
"""
Decode a L{ByteArray} contents to a C{uuid.UUID} instance.
@since: 0.5
"""
return uuid.UUID(bytes=str(obj))
pyamf.register_package(globals(), package=NAMESPACE)
pyamf.register_class(AcknowledgeMessageExt, 'DSK')
pyamf.register_class(CommandMessageExt, 'DSC')
pyamf.register_class(AsyncMessageExt, 'DSA')