X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/90469357969d0550410a59e733661feece6b45de..41ba40612c35f8c7510e7642fcbb0f3589e19234:/lib/confd/client.py diff --git a/lib/confd/client.py b/lib/confd/client.py index 0ceb6c7..e944431 100644 --- a/lib/confd/client.py +++ b/lib/confd/client.py @@ -21,6 +21,27 @@ """Ganeti confd client +Clients can use the confd client library to send requests to a group of master +candidates running confd. The expected usage is through the asyncore framework, +by sending queries, and asynchronously receiving replies through a callback. + +This way the client library doesn't ever need to "wait" on a particular answer, +and can proceed even if some udp packets are lost. It's up to the user to +reschedule queries if they haven't received responses and they need them. + +Example usage: + client = ConfdClient(...) # includes callback specification + req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) + client.SendRequest(req) + # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run() + # ... wait ... + # And your callback will be called by asyncore, when your query gets a + # response, or when it expires. + +You can use the provided ConfdFilterCallback to act as a filter, only passing +"newer" answer to your callback, and filtering out outdated ones, or ones +confirming what you already got. + """ import socket import time @@ -65,24 +86,46 @@ class ConfdClient: through asyncore or with your own handling. """ - def __init__(self, hmac_key, peers): + def __init__(self, hmac_key, peers, callback, port=None, logger=None): """Constructor for ConfdClient @type hmac_key: string @param hmac_key: hmac key to talk to confd @type peers: list @param peers: list of peer nodes + @type callback: f(L{ConfdUpcallPayload}) + @param callback: function to call when getting answers + @type port: integer + @keyword port: confd port (default: use GetDaemonPort) + @type logger: L{logging.Logger} + @keyword logger: optional logger for internal conditions """ - if not isinstance(peers, list): - raise errors.ProgrammerError("peers must be a list") + if not callable(callback): + raise errors.ProgrammerError("callback must be callable") - self._peers = peers + self.UpdatePeerList(peers) self._hmac_key = hmac_key self._socket = ConfdAsyncUDPClient(self) - self._callbacks = {} - self._expire_callbacks = [] - self._confd_port = utils.GetDaemonPort(constants.CONFD) + self._callback = callback + self._confd_port = port + self._logger = logger + self._requests = {} + self._expire_requests = [] + + if self._confd_port is None: + self._confd_port = utils.GetDaemonPort(constants.CONFD) + + def UpdatePeerList(self, peers): + """Update the list of peers + + @type peers: list + @param peers: list of peer nodes + + """ + if not isinstance(peers, list): + raise errors.ProgrammerError("peers must be a list") + self._peers = peers def _PackRequest(self, request, now=None): """Prepare a request to be sent on the wire. @@ -99,29 +142,36 @@ class ConfdClient: def _UnpackReply(self, payload): in_payload = confd.UnpackMagic(payload) - (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key) + (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key) + answer = objects.ConfdReply.FromDict(dict_answer) return answer, salt - def _ExpireCallbacks(self): - """Delete all the expired callbacks. + def ExpireRequests(self): + """Delete all the expired requests. """ now = time.time() - while self._expire_callbacks: - expire_time, rsalt = self._expire_callbacks[0] + while self._expire_requests: + expire_time, rsalt = self._expire_requests[0] if now >= expire_time: - self._expire_callbacks.pop() - del self._callbacks[rsalt] + self._expire_requests.pop(0) + (request, args) = self._requests[rsalt] + del self._requests[rsalt] + client_reply = ConfdUpcallPayload(salt=rsalt, + type=UPCALL_EXPIRE, + orig_request=request, + extra_args=args, + client=self, + ) + self._callback(client_reply) else: break - def SendRequest(self, request, callback, args=None, coverage=None): + def SendRequest(self, request, args=None, coverage=None): """Send a confd request to some MCs @type request: L{objects.ConfdRequest} @param request: the request to send - @type callback: f(answer, req_type, req_query, salt, ip, port, args) - @param callback: answer callback @type args: tuple @keyword args: additional callback arguments @type coverage: integer @@ -131,9 +181,6 @@ class ConfdClient: if coverage is None: coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE) - if not callable(callback): - raise errors.ConfdClientError("callback must be callable") - if coverage > len(self._peers): raise errors.ConfdClientError("Not enough MCs known to provide the" " desired coverage") @@ -141,8 +188,8 @@ class ConfdClient: if not request.rsalt: raise errors.ConfdClientError("Missing request rsalt") - self._ExpireCallbacks() - if request.rsalt in self._callbacks: + self.ExpireRequests() + if request.rsalt in self._requests: raise errors.ConfdClientError("Duplicate request rsalt") if request.type not in constants.CONFD_REQS: @@ -160,10 +207,9 @@ class ConfdClient: except errors.UdpDataSizeError: raise errors.ConfdClientError("Request too big") - self._callbacks[request.rsalt] = (callback, request.type, - request.query, args) + self._requests[request.rsalt] = (request, args) expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT - self._expire_callbacks.append((expire_time, request.rsalt)) + self._expire_requests.append((expire_time, request.rsalt)) def HandleResponse(self, payload, ip, port): """Asynchronous handler for a confd reply @@ -174,20 +220,76 @@ class ConfdClient: try: try: answer, salt = self._UnpackReply(payload) - except (errors.SignatureError, errors.ConfdMagicError): + except (errors.SignatureError, errors.ConfdMagicError), err: + if self._logger: + self._logger.debug("Discarding broken package: %s" % err) return try: - (callback, type, query, args) = self._callbacks[salt] + (request, args) = self._requests[salt] except KeyError: - # If the salt is unkown the answer is probably a replay of an old - # expired query. Ignoring it. - pass - else: - callback(answer, type, query, salt, ip, port, args) + if self._logger: + self._logger.debug("Discarding unknown (expired?) reply: %s" % err) + return + + client_reply = ConfdUpcallPayload(salt=salt, + type=UPCALL_REPLY, + server_reply=answer, + orig_request=request, + server_ip=ip, + server_port=port, + extra_args=args, + client=self, + ) + self._callback(client_reply) finally: - self._ExpireCallbacks() + self.ExpireRequests() + + +# UPCALL_REPLY: server reply upcall +# has all ConfdUpcallPayload fields populated +UPCALL_REPLY = 1 +# UPCALL_EXPIRE: internal library request expire +# has only salt, type, orig_request and extra_args +UPCALL_EXPIRE = 2 +CONFD_UPCALL_TYPES = frozenset([ + UPCALL_REPLY, + UPCALL_EXPIRE, + ]) + + +class ConfdUpcallPayload(objects.ConfigObject): + """Callback argument for confd replies + + @type salt: string + @ivar salt: salt associated with the query + @type type: one of confd.client.CONFD_UPCALL_TYPES + @ivar type: upcall type (server reply, expired request, ...) + @type orig_request: L{objects.ConfdRequest} + @ivar orig_request: original request + @type server_reply: L{objects.ConfdReply} + @ivar server_reply: server reply + @type server_ip: string + @ivar server_ip: answering server ip address + @type server_port: int + @ivar server_port: answering server port + @type extra_args: any + @ivar extra_args: 'args' argument of the SendRequest function + @type client: L{ConfdClient} + @ivar client: current confd client instance + + """ + __slots__ = [ + "salt", + "type", + "orig_request", + "server_reply", + "server_ip", + "server_port", + "extra_args", + "client", + ] class ConfdClientRequest(objects.ConfdRequest): @@ -206,3 +308,94 @@ class ConfdClientRequest(objects.ConfdRequest): if self.type not in constants.CONFD_REQS: raise errors.ConfdClientError("Invalid request type") + +class ConfdFilterCallback: + """Callback that calls another callback, but filters duplicate results. + + """ + def __init__(self, callback, logger=None): + """Constructor for ConfdFilterCallback + + @type callback: f(L{ConfdUpcallPayload}) + @param callback: function to call when getting answers + @type logger: L{logging.Logger} + @keyword logger: optional logger for internal conditions + + """ + if not callable(callback): + raise errors.ProgrammerError("callback must be callable") + + self._callback = callback + self._logger = logger + # answers contains a dict of salt -> answer + self._answers = {} + + def _LogFilter(self, salt, new_reply, old_reply): + if not self._logger: + return + + if new_reply.serial > old_reply.serial: + self._logger.debug("Filtering confirming answer, with newer" + " serial for query %s" % salt) + elif new_reply.serial == old_reply.serial: + if new_reply.answer != old_reply.answer: + self._logger.warning("Got incoherent answers for query %s" + " (serial: %s)" % (salt, new_reply.serial)) + else: + self._logger.debug("Filtering confirming answer, with same" + " serial for query %s" % salt) + else: + self._logger.debug("Filtering outdated answer for query %s" + " serial: (%d < %d)" % (salt, old_reply.serial, + new_reply.serial)) + + def _HandleExpire(self, up): + # if we have no answer we have received none, before the expiration. + if up.salt in self._answers: + del self._answers[up.salt] + + def _HandleReply(self, up): + """Handle a single confd reply, and decide whether to filter it. + + @rtype: boolean + @return: True if the reply should be filtered, False if it should be passed + on to the up-callback + + """ + filter_upcall = False + salt = up.salt + if salt not in self._answers: + # first answer for a query (don't filter, and record) + self._answers[salt] = up.server_reply + elif up.server_reply.serial > self._answers[salt].serial: + # newer answer (record, and compare contents) + old_answer = self._answers[salt] + self._answers[salt] = up.server_reply + if up.server_reply.answer == old_answer.answer: + # same content (filter) (version upgrade was unrelated) + filter_upcall = True + self._LogFilter(salt, up.server_reply, old_answer) + # else: different content, pass up a second answer + else: + # older or same-version answer (duplicate or outdated, filter) + filter_upcall = True + self._LogFilter(salt, up.server_reply, self._answers[salt]) + + return filter_upcall + + def __call__(self, up): + """Filtering callback + + @type up: L{ConfdUpcallPayload} + @param up: upper callback + + """ + filter_upcall = False + if up.type == UPCALL_REPLY: + filter_upcall = self._HandleReply(up) + elif up.type == UPCALL_EXPIRE: + self._HandleExpire(up) + + if not filter_upcall: + self._callback(up) +