Revision 96e03b0b

b/lib/confd/client.py
65 65
  through asyncore or with your own handling.
66 66

  
67 67
  """
68
  def __init__(self, hmac_key, peers):
68
  def __init__(self, hmac_key, peers, callback):
69 69
    """Constructor for ConfdClient
70 70

  
71 71
    @type hmac_key: string
72 72
    @param hmac_key: hmac key to talk to confd
73 73
    @type peers: list
74 74
    @param peers: list of peer nodes
75
    @type callback: f(L{ConfdUpcallPayload})
76
    @param callback: function to call when getting answers
75 77

  
76 78
    """
77 79
    if not isinstance(peers, list):
78 80
      raise errors.ProgrammerError("peers must be a list")
81
    if not callable(callback):
82
      raise errors.ProgrammerError("callback must be callable")
79 83

  
80 84
    self._peers = peers
81 85
    self._hmac_key = hmac_key
82 86
    self._socket = ConfdAsyncUDPClient(self)
83
    self._callbacks = {}
84
    self._expire_callbacks = []
87
    self._callback = callback
88
    self._requests = {}
89
    self._expire_requests = []
85 90
    self._confd_port = utils.GetDaemonPort(constants.CONFD)
86 91

  
87 92
  def _PackRequest(self, request, now=None):
......
102 107
    (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
103 108
    return answer, salt
104 109

  
105
  def _ExpireCallbacks(self):
106
    """Delete all the expired callbacks.
110
  def ExpireRequests(self):
111
    """Delete all the expired requests.
107 112

  
108 113
    """
109 114
    now = time.time()
110
    while self._expire_callbacks:
111
      expire_time, rsalt = self._expire_callbacks[0]
115
    while self._expire_requests:
116
      expire_time, rsalt = self._expire_requests[0]
112 117
      if now >= expire_time:
113
        self._expire_callbacks.pop()
114
        del self._callbacks[rsalt]
118
        self._expire_requests.pop(0)
119
        (request, args) = self._requests[rsalt]
120
        del self._requests[rsalt]
121
        client_reply = ConfdUpcallPayload(salt=rsalt,
122
                                          type=UPCALL_EXPIRE,
123
                                          orig_request=request,
124
                                          extra_args=args)
125
        self._callback(client_reply)
115 126
      else:
116 127
        break
117 128

  
118
  def SendRequest(self, request, callback, args=None, coverage=None):
129
  def SendRequest(self, request, args=None, coverage=None):
119 130
    """Send a confd request to some MCs
120 131

  
121 132
    @type request: L{objects.ConfdRequest}
122 133
    @param request: the request to send
123
    @type callback: f(answer, req_type, req_query, salt, ip, port, args)
124
    @param callback: answer callback
125 134
    @type args: tuple
126 135
    @keyword args: additional callback arguments
127 136
    @type coverage: integer
......
131 140
    if coverage is None:
132 141
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
133 142

  
134
    if not callable(callback):
135
      raise errors.ConfdClientError("callback must be callable")
136

  
137 143
    if coverage > len(self._peers):
138 144
      raise errors.ConfdClientError("Not enough MCs known to provide the"
139 145
                                    " desired coverage")
......
141 147
    if not request.rsalt:
142 148
      raise errors.ConfdClientError("Missing request rsalt")
143 149

  
144
    self._ExpireCallbacks()
145
    if request.rsalt in self._callbacks:
150
    self.ExpireRequests()
151
    if request.rsalt in self._requests:
146 152
      raise errors.ConfdClientError("Duplicate request rsalt")
147 153

  
148 154
    if request.type not in constants.CONFD_REQS:
......
160 166
      except errors.UdpDataSizeError:
161 167
        raise errors.ConfdClientError("Request too big")
162 168

  
163
    self._callbacks[request.rsalt] = (callback, request.type,
164
                                      request.query, args)
169
    self._requests[request.rsalt] = (request, args)
165 170
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
166
    self._expire_callbacks.append((expire_time, request.rsalt))
171
    self._expire_requests.append((expire_time, request.rsalt))
167 172

  
168 173
  def HandleResponse(self, payload, ip, port):
169 174
    """Asynchronous handler for a confd reply
......
178 183
        return
179 184

  
180 185
      try:
181
        (callback, type, query, args) = self._callbacks[salt]
186
        (request, args) = self._requests[salt]
182 187
      except KeyError:
183 188
        # If the salt is unkown the answer is probably a replay of an old
184 189
        # expired query. Ignoring it.
185
        pass
186
      else:
187
        callback(answer, type, query, salt, ip, port, args)
190
        return
191

  
192
      client_reply = ConfdUpcallPayload(salt=salt,
193
                                        type=UPCALL_REPLY,
194
                                        server_reply=answer,
195
                                        orig_request=request,
196
                                        server_ip=ip,
197
                                        server_port=port,
198
                                        extra_args=args)
199
      self._callback(client_reply)
188 200

  
189 201
    finally:
190
      self._ExpireCallbacks()
202
      self.ExpireRequests()
203

  
204

  
205
# UPCALL_REPLY: server reply upcall
206
# has all ConfdUpcallPayload fields populated
207
UPCALL_REPLY = 1
208
# UPCALL_EXPIRE: internal library request expire
209
# has only salt, type, orig_request and extra_args
210
UPCALL_EXPIRE = 2
211
CONFD_UPCALL_TYPES = frozenset([
212
  UPCALL_REPLY,
213
  UPCALL_EXPIRE,
214
  ])
215

  
216

  
217
class ConfdUpcallPayload(objects.ConfigObject):
218
  """Callback argument for confd replies
219

  
220
  @type salt: string
221
  @ivar salt: salt associated with the query
222
  @type type: one of confd.client.CONFD_UPCALL_TYPES
223
  @ivar type: upcall type (server reply, expired request, ...)
224
  @type orig_request: L{objects.ConfdRequest}
225
  @ivar orig_request: original request
226
  @type server_reply: L{objects.ConfdReply}
227
  @ivar server_reply: server reply
228
  @type server_ip: string
229
  @ivar server_ip: answering server ip address
230
  @type server_port: int
231
  @ivar server_port: answering server port
232
  @type extra_args: any
233
  @ivar extra_args: 'args' argument of the SendRequest function
234

  
235
  """
236
  __slots__ = [
237
    "salt",
238
    "type",
239
    "orig_request",
240
    "server_reply",
241
    "server_ip",
242
    "server_port",
243
    "extra_args",
244
    ]
191 245

  
192 246

  
193 247
class ConfdClientRequest(objects.ConfdRequest):

Also available in: Unified diff