instance = object.__new__(cls)
instance._lock = Lock()
instance._id2rpc = {}
- instance._pipelined = session.can_pipeline
+ #instance._pipelined = session.can_pipeline
session.add_listener(instance)
return instance
tag, attrs = root
if xml_.unqualify(tag) != 'rpc-reply':
return
- rpc = None
- for key in attrs:
- if xml_.unqualify(key) == 'message-id':
- id = attrs[key]
+ for key in attrs: # in the <rpc-reply> attributes
+ if xml_.unqualify(key) == 'message-id': # if we found msgid attr
+ id = attrs[key] # get the msgid
try:
with self._lock:
- rpc = self._id2rpc.pop(id)
+ rpc = self._id2rpc.get(id) # the corresponding rpc
+ logger.debug('delivering to %r' % rpc)
+ rpc.deliver_reply(raw)
except KeyError:
- logger.warning('no object registered for message-id: [%s]' % id)
- except Exception as e:
- logger.debug('error - %r' % e)
- break
+ raise OperationError('Unknown message-id: %s', id)
+ # no catching other exceptions, fail loudly if must
+ else:
+ # if no error delivering, can del the reference to the RPC
+ del self._id2rpc[id]
+ break
else:
- if not self._pipelined:
- with self._lock:
- assert(len(self._id2rpc) == 1)
- rpc = self._id2rpc.values()[0]
- self._id2rpc.clear()
- else:
- logger.warning('<rpc-reply> without message-id received: %s' % raw)
- logger.debug('delivering to %r' % rpc)
- rpc.deliver_reply(raw)
-
+ raise OperationError('Could not find "message-id" attribute in <rpc-reply>')
+
def errback(self, err):
try:
for rpc in self._id2rpc.values():
def _request(self, op):
"""Subclasses call this method to make the RPC request.
-
- In asynchronous mode, returns an :class:`~threading.Event` which is set
- when the reply has been received or an error occured. It is prudent,
- therefore, to check the :attr:`error` attribute before accesing
- :attr:`reply`.
-
- Otherwise, waits until the reply is received and returns
+
+ In synchronous mode, waits until the reply is received and returns
:class:`RPCReply`.
-
+
+ In asynchronous mode, returns immediately, returning a reference to this
+ object. The :attr:`event` attribute will be set when the reply has been
+ received (see :attr:`reply`) or an error occured (see :attr:`error`).
+
:arg opspec: :ref:`dtree` for the operation
:type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
- :rtype: :class:`~threading.Event` or :class:`RPCReply`
+ :rtype: :class:`RPCReply` (sync) or :class:`RPC` (async)
"""
logger.debug('request %r with opsepc=%r' % (self, op))
req = self._build(op)
self._session.send(req)
if self._async:
- logger.debug('async, returning event')
- return self._event
+ logger.debug('async, returning')
+ return self
else:
logger.debug('sync, will wait for timeout=%r' % self._timeout)
self._event.wait(self._timeout)
if capability not in self._session.server_capabilities:
raise MissingCapabilityError('Server does not support [%s]' %
capability)
-
+
def deliver_reply(self, raw):
# internal use
self._reply = self.REPLY_CLS(raw)
- self._delivery_hook()
+ #self._delivery_hook() -- usecase?!
self._event.set()
def deliver_error(self, err):
# internal use
self._error = err
- self._delivery_hook()
+ #self._delivery_hook() -- usecase?!
self._event.set()
@property
@property
def error(self):
""":exc:`Exception` type if an error occured or :const:`None`.
-
- This attribute should be checked if the request was made asynchronously,
- so that it can be determined if :attr:`event` being set is because of a
- reply or error.
-
+
.. note::
This represents an error which prevented a reply from being
received. An *<rpc-error>* does not fall in that category -- see