"""Ganeti confd client
+Clients can use the confd client library to send requests to a group of master
+candidates running confd. The expected usage is through the asyncore framework,
+by sending queries, and asynchronously receiving replies through a callback.
+
+This way the client library doesn't ever need to "wait" on a particular answer,
+and can proceed even if some udp packets are lost. It's up to the user to
+reschedule queries if they haven't received responses and they need them.
+
+Example usage::
+
+ client = ConfdClient(...) # includes callback specification
+ req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
+ client.SendRequest(req)
+ # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
+ # ... wait ...
+ # And your callback will be called by asyncore, when your query gets a
+ # response, or when it expires.
+
+You can use the provided ConfdFilterCallback to act as a filter, only passing
+"newer" answer to your callback, and filtering out outdated ones, or ones
+confirming what you already got.
+
"""
+
import socket
import time
import random
through asyncore or with your own handling.
"""
- def __init__(self, hmac_key, peers, callback):
+ def __init__(self, hmac_key, peers, callback, port=None, logger=None):
"""Constructor for ConfdClient
@type hmac_key: string
@param peers: list of peer nodes
@type callback: f(L{ConfdUpcallPayload})
@param callback: function to call when getting answers
+ @type port: integer
+ @keyword port: confd port (default: use GetDaemonPort)
+ @type logger: logging.Logger
+ @keyword logger: optional logger for internal conditions
"""
- if not isinstance(peers, list):
- raise errors.ProgrammerError("peers must be a list")
if not callable(callback):
raise errors.ProgrammerError("callback must be callable")
- self._peers = peers
+ self.UpdatePeerList(peers)
self._hmac_key = hmac_key
self._socket = ConfdAsyncUDPClient(self)
self._callback = callback
+ self._confd_port = port
+ self._logger = logger
self._requests = {}
self._expire_requests = []
- self._confd_port = utils.GetDaemonPort(constants.CONFD)
+
+ if self._confd_port is None:
+ self._confd_port = utils.GetDaemonPort(constants.CONFD)
+
+ def UpdatePeerList(self, peers):
+ """Update the list of peers
+
+ @type peers: list
+ @param peers: list of peer nodes
+
+ """
+ if not isinstance(peers, list):
+ raise errors.ProgrammerError("peers must be a list")
+ self._peers = peers
def _PackRequest(self, request, now=None):
"""Prepare a request to be sent on the wire.
def _UnpackReply(self, payload):
in_payload = confd.UnpackMagic(payload)
- (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
+ (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
+ answer = objects.ConfdReply.FromDict(dict_answer)
return answer, salt
def ExpireRequests(self):
client_reply = ConfdUpcallPayload(salt=rsalt,
type=UPCALL_EXPIRE,
orig_request=request,
- extra_args=args)
+ extra_args=args,
+ client=self,
+ )
self._callback(client_reply)
else:
break
try:
try:
answer, salt = self._UnpackReply(payload)
- except (errors.SignatureError, errors.ConfdMagicError):
+ except (errors.SignatureError, errors.ConfdMagicError), err:
+ if self._logger:
+ self._logger.debug("Discarding broken package: %s" % err)
return
try:
(request, args) = self._requests[salt]
except KeyError:
- # If the salt is unkown the answer is probably a replay of an old
- # expired query. Ignoring it.
+ if self._logger:
+ self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
return
client_reply = ConfdUpcallPayload(salt=salt,
orig_request=request,
server_ip=ip,
server_port=port,
- extra_args=args)
+ extra_args=args,
+ client=self,
+ )
self._callback(client_reply)
finally:
@ivar server_port: answering server port
@type extra_args: any
@ivar extra_args: 'args' argument of the SendRequest function
+ @type client: L{ConfdClient}
+ @ivar client: current confd client instance
"""
__slots__ = [
"server_ip",
"server_port",
"extra_args",
+ "client",
]
if self.type not in constants.CONFD_REQS:
raise errors.ConfdClientError("Invalid request type")
+
+class ConfdFilterCallback:
+ """Callback that calls another callback, but filters duplicate results.
+
+ """
+ def __init__(self, callback, logger=None):
+ """Constructor for ConfdFilterCallback
+
+ @type callback: f(L{ConfdUpcallPayload})
+ @param callback: function to call when getting answers
+ @type logger: logging.Logger
+ @keyword logger: optional logger for internal conditions
+
+ """
+ if not callable(callback):
+ raise errors.ProgrammerError("callback must be callable")
+
+ self._callback = callback
+ self._logger = logger
+ # answers contains a dict of salt -> answer
+ self._answers = {}
+
+ def _LogFilter(self, salt, new_reply, old_reply):
+ if not self._logger:
+ return
+
+ if new_reply.serial > old_reply.serial:
+ self._logger.debug("Filtering confirming answer, with newer"
+ " serial for query %s" % salt)
+ elif new_reply.serial == old_reply.serial:
+ if new_reply.answer != old_reply.answer:
+ self._logger.warning("Got incoherent answers for query %s"
+ " (serial: %s)" % (salt, new_reply.serial))
+ else:
+ self._logger.debug("Filtering confirming answer, with same"
+ " serial for query %s" % salt)
+ else:
+ self._logger.debug("Filtering outdated answer for query %s"
+ " serial: (%d < %d)" % (salt, old_reply.serial,
+ new_reply.serial))
+
+ def _HandleExpire(self, up):
+ # if we have no answer we have received none, before the expiration.
+ if up.salt in self._answers:
+ del self._answers[up.salt]
+
+ def _HandleReply(self, up):
+ """Handle a single confd reply, and decide whether to filter it.
+
+ @rtype: boolean
+ @return: True if the reply should be filtered, False if it should be passed
+ on to the up-callback
+
+ """
+ filter_upcall = False
+ salt = up.salt
+ if salt not in self._answers:
+ # first answer for a query (don't filter, and record)
+ self._answers[salt] = up.server_reply
+ elif up.server_reply.serial > self._answers[salt].serial:
+ # newer answer (record, and compare contents)
+ old_answer = self._answers[salt]
+ self._answers[salt] = up.server_reply
+ if up.server_reply.answer == old_answer.answer:
+ # same content (filter) (version upgrade was unrelated)
+ filter_upcall = True
+ self._LogFilter(salt, up.server_reply, old_answer)
+ # else: different content, pass up a second answer
+ else:
+ # older or same-version answer (duplicate or outdated, filter)
+ filter_upcall = True
+ self._LogFilter(salt, up.server_reply, self._answers[salt])
+
+ return filter_upcall
+
+ def __call__(self, up):
+ """Filtering callback
+
+ @type up: L{ConfdUpcallPayload}
+ @param up: upper callback
+
+ """
+ filter_upcall = False
+ if up.type == UPCALL_REPLY:
+ filter_upcall = self._HandleReply(up)
+ elif up.type == UPCALL_EXPIRE:
+ self._HandleExpire(up)
+
+ if not filter_upcall:
+ self._callback(up)
+