Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ e3ac8406

History | View | Annotate | Download (19.7 kB)

1 2f31098c Iustin Pop
#
2 a8083063 Iustin Pop
#
3 a8083063 Iustin Pop
4 d2cd6944 Iustin Pop
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 a8083063 Iustin Pop
#
6 a8083063 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 a8083063 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 a8083063 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 a8083063 Iustin Pop
# (at your option) any later version.
10 a8083063 Iustin Pop
#
11 a8083063 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 a8083063 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 a8083063 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 a8083063 Iustin Pop
# General Public License for more details.
15 a8083063 Iustin Pop
#
16 a8083063 Iustin Pop
# You should have received a copy of the GNU General Public License
17 a8083063 Iustin Pop
# along with this program; if not, write to the Free Software
18 a8083063 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 a8083063 Iustin Pop
# 02110-1301, USA.
20 a8083063 Iustin Pop
21 a8083063 Iustin Pop
22 3ef3c771 Iustin Pop
"""Inter-node RPC library.
23 a8083063 Iustin Pop

24 a8083063 Iustin Pop
"""
25 a8083063 Iustin Pop
26 b459a848 Andrea Spadaccini
# pylint: disable=C0103,R0201,R0904
27 72737a7f Iustin Pop
# C0103: Invalid name, since call_ are not valid
28 72737a7f Iustin Pop
# R0201: Method could be a function, we keep all rpcs instance methods
29 72737a7f Iustin Pop
# as not to change them back and forth between static/instance methods
30 72737a7f Iustin Pop
# if they need to start using instance attributes
31 72737a7f Iustin Pop
# R0904: Too many public methods
32 a8083063 Iustin Pop
33 a8083063 Iustin Pop
import os
34 58b311ca Iustin Pop
import logging
35 12bce260 Michael Hanselmann
import zlib
36 12bce260 Michael Hanselmann
import base64
37 33231500 Michael Hanselmann
import pycurl
38 33231500 Michael Hanselmann
import threading
39 a8083063 Iustin Pop
40 a8083063 Iustin Pop
from ganeti import utils
41 a8083063 Iustin Pop
from ganeti import objects
42 ecfe9491 Michael Hanselmann
from ganeti import http
43 7c28c575 Michael Hanselmann
from ganeti import serializer
44 eafd8762 Michael Hanselmann
from ganeti import constants
45 781de953 Iustin Pop
from ganeti import errors
46 a744b676 Manuel Franceschini
from ganeti import netutils
47 eb202c13 Manuel Franceschini
from ganeti import ssconf
48 9a914f7a René Nussbaumer
from ganeti import runtime
49 00267bfe Michael Hanselmann
from ganeti import compat
50 a8083063 Iustin Pop
51 200de241 Michael Hanselmann
# Special module generated at build time
52 200de241 Michael Hanselmann
from ganeti import _generated_rpc
53 200de241 Michael Hanselmann
54 fe267188 Iustin Pop
# pylint has a bug here, doesn't see this import
55 b459a848 Andrea Spadaccini
import ganeti.http.client  # pylint: disable=W0611
56 ae88ef45 Michael Hanselmann
57 a8083063 Iustin Pop
58 33231500 Michael Hanselmann
# Timeout for connecting to nodes (seconds)
59 33231500 Michael Hanselmann
_RPC_CONNECT_TIMEOUT = 5
60 33231500 Michael Hanselmann
61 33231500 Michael Hanselmann
_RPC_CLIENT_HEADERS = [
62 33231500 Michael Hanselmann
  "Content-type: %s" % http.HTTP_APP_JSON,
63 8e29563f Iustin Pop
  "Expect:",
64 33231500 Michael Hanselmann
  ]
65 4331f6cd Michael Hanselmann
66 92fd2250 Iustin Pop
# Various time constants for the timeout table
67 92fd2250 Iustin Pop
_TMO_URGENT = 60 # one minute
68 92fd2250 Iustin Pop
_TMO_FAST = 5 * 60 # five minutes
69 92fd2250 Iustin Pop
_TMO_NORMAL = 15 * 60 # 15 minutes
70 92fd2250 Iustin Pop
_TMO_SLOW = 3600 # one hour
71 92fd2250 Iustin Pop
_TMO_4HRS = 4 * 3600
72 92fd2250 Iustin Pop
_TMO_1DAY = 86400
73 92fd2250 Iustin Pop
74 00267bfe Michael Hanselmann
#: Special value to describe an offline host
75 00267bfe Michael Hanselmann
_OFFLINE = object()
76 00267bfe Michael Hanselmann
77 4331f6cd Michael Hanselmann
78 4331f6cd Michael Hanselmann
def Init():
79 4331f6cd Michael Hanselmann
  """Initializes the module-global HTTP client manager.
80 4331f6cd Michael Hanselmann

81 33231500 Michael Hanselmann
  Must be called before using any RPC function and while exactly one thread is
82 33231500 Michael Hanselmann
  running.
83 4331f6cd Michael Hanselmann

84 4331f6cd Michael Hanselmann
  """
85 33231500 Michael Hanselmann
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86 33231500 Michael Hanselmann
  # one thread running. This check is just a safety measure -- it doesn't
87 33231500 Michael Hanselmann
  # cover all cases.
88 33231500 Michael Hanselmann
  assert threading.activeCount() == 1, \
89 33231500 Michael Hanselmann
         "Found more than one active thread when initializing pycURL"
90 4331f6cd Michael Hanselmann
91 33231500 Michael Hanselmann
  logging.info("Using PycURL %s", pycurl.version)
92 8d0a4f99 Michael Hanselmann
93 33231500 Michael Hanselmann
  pycurl.global_init(pycurl.GLOBAL_ALL)
94 4331f6cd Michael Hanselmann
95 4331f6cd Michael Hanselmann
96 4331f6cd Michael Hanselmann
def Shutdown():
97 4331f6cd Michael Hanselmann
  """Stops the module-global HTTP client manager.
98 4331f6cd Michael Hanselmann

99 33231500 Michael Hanselmann
  Must be called before quitting the program and while exactly one thread is
100 33231500 Michael Hanselmann
  running.
101 4331f6cd Michael Hanselmann

102 4331f6cd Michael Hanselmann
  """
103 33231500 Michael Hanselmann
  pycurl.global_cleanup()
104 33231500 Michael Hanselmann
105 33231500 Michael Hanselmann
106 33231500 Michael Hanselmann
def _ConfigRpcCurl(curl):
107 33231500 Michael Hanselmann
  noded_cert = str(constants.NODED_CERT_FILE)
108 4331f6cd Michael Hanselmann
109 33231500 Michael Hanselmann
  curl.setopt(pycurl.FOLLOWLOCATION, False)
110 33231500 Michael Hanselmann
  curl.setopt(pycurl.CAINFO, noded_cert)
111 33231500 Michael Hanselmann
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112 33231500 Michael Hanselmann
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
113 33231500 Michael Hanselmann
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114 33231500 Michael Hanselmann
  curl.setopt(pycurl.SSLCERT, noded_cert)
115 33231500 Michael Hanselmann
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116 33231500 Michael Hanselmann
  curl.setopt(pycurl.SSLKEY, noded_cert)
117 33231500 Michael Hanselmann
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118 33231500 Michael Hanselmann
119 33231500 Michael Hanselmann
120 e0e916fe Iustin Pop
def RunWithRPC(fn):
121 e0e916fe Iustin Pop
  """RPC-wrapper decorator.
122 e0e916fe Iustin Pop

123 e0e916fe Iustin Pop
  When applied to a function, it runs it with the RPC system
124 e0e916fe Iustin Pop
  initialized, and it shutsdown the system afterwards. This means the
125 e0e916fe Iustin Pop
  function must be called without RPC being initialized.
126 e0e916fe Iustin Pop

127 e0e916fe Iustin Pop
  """
128 e0e916fe Iustin Pop
  def wrapper(*args, **kwargs):
129 e0e916fe Iustin Pop
    Init()
130 e0e916fe Iustin Pop
    try:
131 e0e916fe Iustin Pop
      return fn(*args, **kwargs)
132 e0e916fe Iustin Pop
    finally:
133 e0e916fe Iustin Pop
      Shutdown()
134 e0e916fe Iustin Pop
  return wrapper
135 e0e916fe Iustin Pop
136 e0e916fe Iustin Pop
137 30474135 Michael Hanselmann
def _Compress(data):
138 30474135 Michael Hanselmann
  """Compresses a string for transport over RPC.
139 30474135 Michael Hanselmann

140 30474135 Michael Hanselmann
  Small amounts of data are not compressed.
141 30474135 Michael Hanselmann

142 30474135 Michael Hanselmann
  @type data: str
143 30474135 Michael Hanselmann
  @param data: Data
144 30474135 Michael Hanselmann
  @rtype: tuple
145 30474135 Michael Hanselmann
  @return: Encoded data to send
146 30474135 Michael Hanselmann

147 30474135 Michael Hanselmann
  """
148 30474135 Michael Hanselmann
  # Small amounts of data are not compressed
149 30474135 Michael Hanselmann
  if len(data) < 512:
150 30474135 Michael Hanselmann
    return (constants.RPC_ENCODING_NONE, data)
151 30474135 Michael Hanselmann
152 30474135 Michael Hanselmann
  # Compress with zlib and encode in base64
153 30474135 Michael Hanselmann
  return (constants.RPC_ENCODING_ZLIB_BASE64,
154 30474135 Michael Hanselmann
          base64.b64encode(zlib.compress(data, 3)))
155 30474135 Michael Hanselmann
156 30474135 Michael Hanselmann
157 781de953 Iustin Pop
class RpcResult(object):
158 781de953 Iustin Pop
  """RPC Result class.
159 781de953 Iustin Pop

160 781de953 Iustin Pop
  This class holds an RPC result. It is needed since in multi-node
161 781de953 Iustin Pop
  calls we can't raise an exception just because one one out of many
162 781de953 Iustin Pop
  failed, and therefore we use this class to encapsulate the result.
163 781de953 Iustin Pop

164 5bbd3f7f Michael Hanselmann
  @ivar data: the data payload, for successful results, or None
165 ed83f5cc Iustin Pop
  @ivar call: the name of the RPC call
166 ed83f5cc Iustin Pop
  @ivar node: the name of the node to which we made the call
167 ed83f5cc Iustin Pop
  @ivar offline: whether the operation failed because the node was
168 ed83f5cc Iustin Pop
      offline, as opposed to actual failure; offline=True will always
169 ed83f5cc Iustin Pop
      imply failed=True, in order to allow simpler checking if
170 ed83f5cc Iustin Pop
      the user doesn't care about the exact failure mode
171 4c4e4e1e Iustin Pop
  @ivar fail_msg: the error message if the call failed
172 ed83f5cc Iustin Pop

173 781de953 Iustin Pop
  """
174 ed83f5cc Iustin Pop
  def __init__(self, data=None, failed=False, offline=False,
175 ed83f5cc Iustin Pop
               call=None, node=None):
176 ed83f5cc Iustin Pop
    self.offline = offline
177 ed83f5cc Iustin Pop
    self.call = call
178 ed83f5cc Iustin Pop
    self.node = node
179 1645d22d Michael Hanselmann
180 ed83f5cc Iustin Pop
    if offline:
181 4c4e4e1e Iustin Pop
      self.fail_msg = "Node is marked offline"
182 f2def43a Iustin Pop
      self.data = self.payload = None
183 ed83f5cc Iustin Pop
    elif failed:
184 4c4e4e1e Iustin Pop
      self.fail_msg = self._EnsureErr(data)
185 f2def43a Iustin Pop
      self.data = self.payload = None
186 781de953 Iustin Pop
    else:
187 781de953 Iustin Pop
      self.data = data
188 d3c8b360 Iustin Pop
      if not isinstance(self.data, (tuple, list)):
189 4c4e4e1e Iustin Pop
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190 4c4e4e1e Iustin Pop
                         type(self.data))
191 1645d22d Michael Hanselmann
        self.payload = None
192 d3c8b360 Iustin Pop
      elif len(data) != 2:
193 4c4e4e1e Iustin Pop
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
194 4c4e4e1e Iustin Pop
                         "expected 2" % len(self.data))
195 1645d22d Michael Hanselmann
        self.payload = None
196 d3c8b360 Iustin Pop
      elif not self.data[0]:
197 4c4e4e1e Iustin Pop
        self.fail_msg = self._EnsureErr(self.data[1])
198 1645d22d Michael Hanselmann
        self.payload = None
199 f2def43a Iustin Pop
      else:
200 d3c8b360 Iustin Pop
        # finally success
201 4c4e4e1e Iustin Pop
        self.fail_msg = None
202 d3c8b360 Iustin Pop
        self.payload = data[1]
203 d3c8b360 Iustin Pop
204 2c0f74f2 Iustin Pop
    for attr_name in ["call", "data", "fail_msg",
205 2c0f74f2 Iustin Pop
                      "node", "offline", "payload"]:
206 2c0f74f2 Iustin Pop
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207 1645d22d Michael Hanselmann
208 d3c8b360 Iustin Pop
  @staticmethod
209 d3c8b360 Iustin Pop
  def _EnsureErr(val):
210 d3c8b360 Iustin Pop
    """Helper to ensure we return a 'True' value for error."""
211 d3c8b360 Iustin Pop
    if val:
212 d3c8b360 Iustin Pop
      return val
213 d3c8b360 Iustin Pop
    else:
214 d3c8b360 Iustin Pop
      return "No error information"
215 781de953 Iustin Pop
216 045dd6d9 Iustin Pop
  def Raise(self, msg, prereq=False, ecode=None):
217 781de953 Iustin Pop
    """If the result has failed, raise an OpExecError.
218 781de953 Iustin Pop

219 781de953 Iustin Pop
    This is used so that LU code doesn't have to check for each
220 781de953 Iustin Pop
    result, but instead can call this function.
221 781de953 Iustin Pop

222 781de953 Iustin Pop
    """
223 4c4e4e1e Iustin Pop
    if not self.fail_msg:
224 4c4e4e1e Iustin Pop
      return
225 4c4e4e1e Iustin Pop
226 4c4e4e1e Iustin Pop
    if not msg: # one could pass None for default message
227 4c4e4e1e Iustin Pop
      msg = ("Call '%s' to node '%s' has failed: %s" %
228 4c4e4e1e Iustin Pop
             (self.call, self.node, self.fail_msg))
229 4c4e4e1e Iustin Pop
    else:
230 4c4e4e1e Iustin Pop
      msg = "%s: %s" % (msg, self.fail_msg)
231 4c4e4e1e Iustin Pop
    if prereq:
232 4c4e4e1e Iustin Pop
      ec = errors.OpPrereqError
233 4c4e4e1e Iustin Pop
    else:
234 4c4e4e1e Iustin Pop
      ec = errors.OpExecError
235 045dd6d9 Iustin Pop
    if ecode is not None:
236 27137e55 Iustin Pop
      args = (msg, ecode)
237 045dd6d9 Iustin Pop
    else:
238 045dd6d9 Iustin Pop
      args = (msg, )
239 b459a848 Andrea Spadaccini
    raise ec(*args) # pylint: disable=W0142
240 781de953 Iustin Pop
241 781de953 Iustin Pop
242 00267bfe Michael Hanselmann
def _SsconfResolver(node_list,
243 00267bfe Michael Hanselmann
                    ssc=ssconf.SimpleStore,
244 00267bfe Michael Hanselmann
                    nslookup_fn=netutils.Hostname.GetIP):
245 eb202c13 Manuel Franceschini
  """Return addresses for given node names.
246 eb202c13 Manuel Franceschini

247 eb202c13 Manuel Franceschini
  @type node_list: list
248 eb202c13 Manuel Franceschini
  @param node_list: List of node names
249 eb202c13 Manuel Franceschini
  @type ssc: class
250 eb202c13 Manuel Franceschini
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
251 17f7fd27 Manuel Franceschini
  @type nslookup_fn: callable
252 17f7fd27 Manuel Franceschini
  @param nslookup_fn: function use to do NS lookup
253 00267bfe Michael Hanselmann
  @rtype: list of tuple; (string, string)
254 00267bfe Michael Hanselmann
  @return: List of tuples containing node name and IP address
255 eb202c13 Manuel Franceschini

256 eb202c13 Manuel Franceschini
  """
257 b43dcc5a Manuel Franceschini
  ss = ssc()
258 b43dcc5a Manuel Franceschini
  iplist = ss.GetNodePrimaryIPList()
259 b43dcc5a Manuel Franceschini
  family = ss.GetPrimaryIPFamily()
260 b705c7a6 Manuel Franceschini
  ipmap = dict(entry.split() for entry in iplist)
261 00267bfe Michael Hanselmann
262 00267bfe Michael Hanselmann
  result = []
263 b705c7a6 Manuel Franceschini
  for node in node_list:
264 00267bfe Michael Hanselmann
    ip = ipmap.get(node)
265 00267bfe Michael Hanselmann
    if ip is None:
266 00267bfe Michael Hanselmann
      ip = nslookup_fn(node, family=family)
267 00267bfe Michael Hanselmann
    result.append((node, ip))
268 00267bfe Michael Hanselmann
269 00267bfe Michael Hanselmann
  return result
270 00267bfe Michael Hanselmann
271 00267bfe Michael Hanselmann
272 00267bfe Michael Hanselmann
class _StaticResolver:
273 00267bfe Michael Hanselmann
  def __init__(self, addresses):
274 00267bfe Michael Hanselmann
    """Initializes this class.
275 00267bfe Michael Hanselmann

276 00267bfe Michael Hanselmann
    """
277 00267bfe Michael Hanselmann
    self._addresses = addresses
278 00267bfe Michael Hanselmann
279 00267bfe Michael Hanselmann
  def __call__(self, hosts):
280 00267bfe Michael Hanselmann
    """Returns static addresses for hosts.
281 00267bfe Michael Hanselmann

282 00267bfe Michael Hanselmann
    """
283 00267bfe Michael Hanselmann
    assert len(hosts) == len(self._addresses)
284 00267bfe Michael Hanselmann
    return zip(hosts, self._addresses)
285 00267bfe Michael Hanselmann
286 eb202c13 Manuel Franceschini
287 00267bfe Michael Hanselmann
def _CheckConfigNode(name, node):
288 00267bfe Michael Hanselmann
  """Checks if a node is online.
289 eb202c13 Manuel Franceschini

290 00267bfe Michael Hanselmann
  @type name: string
291 00267bfe Michael Hanselmann
  @param name: Node name
292 00267bfe Michael Hanselmann
  @type node: L{objects.Node} or None
293 00267bfe Michael Hanselmann
  @param node: Node object
294 eb202c13 Manuel Franceschini

295 00267bfe Michael Hanselmann
  """
296 00267bfe Michael Hanselmann
  if node is None:
297 00267bfe Michael Hanselmann
    # Depend on DNS for name resolution
298 00267bfe Michael Hanselmann
    ip = name
299 00267bfe Michael Hanselmann
  elif node.offline:
300 00267bfe Michael Hanselmann
    ip = _OFFLINE
301 00267bfe Michael Hanselmann
  else:
302 00267bfe Michael Hanselmann
    ip = node.primary_ip
303 00267bfe Michael Hanselmann
  return (name, ip)
304 a8083063 Iustin Pop
305 a8083063 Iustin Pop
306 00267bfe Michael Hanselmann
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
307 00267bfe Michael Hanselmann
  """Calculate node addresses using configuration.
308 a8083063 Iustin Pop

309 a8083063 Iustin Pop
  """
310 00267bfe Michael Hanselmann
  # Special case for single-host lookups
311 00267bfe Michael Hanselmann
  if len(hosts) == 1:
312 00267bfe Michael Hanselmann
    (name, ) = hosts
313 00267bfe Michael Hanselmann
    return [_CheckConfigNode(name, single_node_fn(name))]
314 00267bfe Michael Hanselmann
  else:
315 00267bfe Michael Hanselmann
    all_nodes = all_nodes_fn()
316 00267bfe Michael Hanselmann
    return [_CheckConfigNode(name, all_nodes.get(name, None))
317 00267bfe Michael Hanselmann
            for name in hosts]
318 00267bfe Michael Hanselmann
319 00267bfe Michael Hanselmann
320 00267bfe Michael Hanselmann
class _RpcProcessor:
321 aea5caef Michael Hanselmann
  def __init__(self, resolver, port, lock_monitor_cb=None):
322 00267bfe Michael Hanselmann
    """Initializes this class.
323 00267bfe Michael Hanselmann

324 00267bfe Michael Hanselmann
    @param resolver: callable accepting a list of hostnames, returning a list
325 00267bfe Michael Hanselmann
      of tuples containing name and IP address (IP address can be the name or
326 00267bfe Michael Hanselmann
      the special value L{_OFFLINE} to mark offline machines)
327 00267bfe Michael Hanselmann
    @type port: int
328 00267bfe Michael Hanselmann
    @param port: TCP port
329 aea5caef Michael Hanselmann
    @param lock_monitor_cb: Callable for registering with lock monitor
330 3ef3c771 Iustin Pop

331 a8083063 Iustin Pop
    """
332 00267bfe Michael Hanselmann
    self._resolver = resolver
333 00267bfe Michael Hanselmann
    self._port = port
334 aea5caef Michael Hanselmann
    self._lock_monitor_cb = lock_monitor_cb
335 eb202c13 Manuel Franceschini
336 00267bfe Michael Hanselmann
  @staticmethod
337 00267bfe Michael Hanselmann
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
338 00267bfe Michael Hanselmann
    """Prepares requests by sorting offline hosts into separate list.
339 eb202c13 Manuel Franceschini

340 00267bfe Michael Hanselmann
    """
341 00267bfe Michael Hanselmann
    results = {}
342 00267bfe Michael Hanselmann
    requests = {}
343 bdf7d8c0 Iustin Pop
344 00267bfe Michael Hanselmann
    for (name, ip) in hosts:
345 00267bfe Michael Hanselmann
      if ip is _OFFLINE:
346 00267bfe Michael Hanselmann
        # Node is marked as offline
347 00267bfe Michael Hanselmann
        results[name] = RpcResult(node=name, offline=True, call=procedure)
348 00267bfe Michael Hanselmann
      else:
349 00267bfe Michael Hanselmann
        requests[name] = \
350 00267bfe Michael Hanselmann
          http.client.HttpClientRequest(str(ip), port,
351 00267bfe Michael Hanselmann
                                        http.HTTP_PUT, str("/%s" % procedure),
352 00267bfe Michael Hanselmann
                                        headers=_RPC_CLIENT_HEADERS,
353 00267bfe Michael Hanselmann
                                        post_data=body,
354 7cb2d205 Michael Hanselmann
                                        read_timeout=read_timeout,
355 abbf2cd9 Michael Hanselmann
                                        nicename="%s/%s" % (name, procedure),
356 abbf2cd9 Michael Hanselmann
                                        curl_config_fn=_ConfigRpcCurl)
357 a8083063 Iustin Pop
358 00267bfe Michael Hanselmann
    return (results, requests)
359 00267bfe Michael Hanselmann
360 00267bfe Michael Hanselmann
  @staticmethod
361 00267bfe Michael Hanselmann
  def _CombineResults(results, requests, procedure):
362 00267bfe Michael Hanselmann
    """Combines pre-computed results for offline hosts with actual call results.
363 bdf7d8c0 Iustin Pop

364 a8083063 Iustin Pop
    """
365 00267bfe Michael Hanselmann
    for name, req in requests.items():
366 00267bfe Michael Hanselmann
      if req.success and req.resp_status_code == http.HTTP_OK:
367 00267bfe Michael Hanselmann
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
368 00267bfe Michael Hanselmann
                                node=name, call=procedure)
369 00267bfe Michael Hanselmann
      else:
370 00267bfe Michael Hanselmann
        # TODO: Better error reporting
371 00267bfe Michael Hanselmann
        if req.error:
372 00267bfe Michael Hanselmann
          msg = req.error
373 00267bfe Michael Hanselmann
        else:
374 00267bfe Michael Hanselmann
          msg = req.resp_body
375 eb202c13 Manuel Franceschini
376 00267bfe Michael Hanselmann
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
377 00267bfe Michael Hanselmann
        host_result = RpcResult(data=msg, failed=True, node=name,
378 00267bfe Michael Hanselmann
                                call=procedure)
379 ecfe9491 Michael Hanselmann
380 00267bfe Michael Hanselmann
      results[name] = host_result
381 92fd2250 Iustin Pop
382 00267bfe Michael Hanselmann
    return results
383 a8083063 Iustin Pop
384 abbf2cd9 Michael Hanselmann
  def __call__(self, hosts, procedure, body, read_timeout=None,
385 abbf2cd9 Michael Hanselmann
               _req_process_fn=http.client.ProcessRequests):
386 00267bfe Michael Hanselmann
    """Makes an RPC request to a number of nodes.
387 ecfe9491 Michael Hanselmann

388 00267bfe Michael Hanselmann
    @type hosts: sequence
389 00267bfe Michael Hanselmann
    @param hosts: Hostnames
390 00267bfe Michael Hanselmann
    @type procedure: string
391 00267bfe Michael Hanselmann
    @param procedure: Request path
392 00267bfe Michael Hanselmann
    @type body: string
393 00267bfe Michael Hanselmann
    @param body: Request body
394 00267bfe Michael Hanselmann
    @type read_timeout: int or None
395 00267bfe Michael Hanselmann
    @param read_timeout: Read timeout for request
396 a8083063 Iustin Pop

397 a8083063 Iustin Pop
    """
398 83e7af18 Michael Hanselmann
    assert read_timeout is not None, \
399 83e7af18 Michael Hanselmann
      "Missing RPC read timeout for procedure '%s'" % procedure
400 a8083063 Iustin Pop
401 00267bfe Michael Hanselmann
    (results, requests) = \
402 00267bfe Michael Hanselmann
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
403 00267bfe Michael Hanselmann
                            str(body), read_timeout)
404 a8083063 Iustin Pop
405 abbf2cd9 Michael Hanselmann
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
406 a8083063 Iustin Pop
407 00267bfe Michael Hanselmann
    assert not frozenset(results).intersection(requests)
408 ecfe9491 Michael Hanselmann
409 00267bfe Michael Hanselmann
    return self._CombineResults(results, requests, procedure)
410 a8083063 Iustin Pop
411 a8083063 Iustin Pop
412 db04ce5d Michael Hanselmann
class RpcRunner(_generated_rpc.RpcClientDefault,
413 415a7304 Michael Hanselmann
                _generated_rpc.RpcClientBootstrap,
414 415a7304 Michael Hanselmann
                _generated_rpc.RpcClientConfig):
415 87b3cb26 Michael Hanselmann
  """RPC runner class.
416 a8083063 Iustin Pop

417 87b3cb26 Michael Hanselmann
  """
418 87b3cb26 Michael Hanselmann
  def __init__(self, context):
419 87b3cb26 Michael Hanselmann
    """Initialized the RPC runner.
420 a8083063 Iustin Pop

421 87b3cb26 Michael Hanselmann
    @type context: C{masterd.GanetiContext}
422 87b3cb26 Michael Hanselmann
    @param context: Ganeti context
423 a8083063 Iustin Pop

424 72737a7f Iustin Pop
    """
425 db04ce5d Michael Hanselmann
    # Pylint doesn't recognize multiple inheritance properly, see
426 db04ce5d Michael Hanselmann
    # <http://www.logilab.org/ticket/36586> and
427 db04ce5d Michael Hanselmann
    # <http://www.logilab.org/ticket/35642>
428 db04ce5d Michael Hanselmann
    # pylint: disable=W0233
429 415a7304 Michael Hanselmann
    _generated_rpc.RpcClientConfig.__init__(self)
430 db04ce5d Michael Hanselmann
    _generated_rpc.RpcClientBootstrap.__init__(self)
431 200de241 Michael Hanselmann
    _generated_rpc.RpcClientDefault.__init__(self)
432 200de241 Michael Hanselmann
433 87b3cb26 Michael Hanselmann
    self._cfg = context.cfg
434 00267bfe Michael Hanselmann
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
435 00267bfe Michael Hanselmann
                                              self._cfg.GetNodeInfo,
436 00267bfe Michael Hanselmann
                                              self._cfg.GetAllNodesInfo),
437 aea5caef Michael Hanselmann
                               netutils.GetDaemonPort(constants.NODED),
438 aea5caef Michael Hanselmann
                               lock_monitor_cb=context.glm.AddToLockMonitor)
439 a8083063 Iustin Pop
440 1bdcbbab Iustin Pop
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
441 26ba2bd8 Iustin Pop
    """Convert the given instance to a dict.
442 26ba2bd8 Iustin Pop

443 26ba2bd8 Iustin Pop
    This is done via the instance's ToDict() method and additionally
444 26ba2bd8 Iustin Pop
    we fill the hvparams with the cluster defaults.
445 26ba2bd8 Iustin Pop

446 26ba2bd8 Iustin Pop
    @type instance: L{objects.Instance}
447 26ba2bd8 Iustin Pop
    @param instance: an Instance object
448 0eca8e0c Iustin Pop
    @type hvp: dict or None
449 5bbd3f7f Michael Hanselmann
    @param hvp: a dictionary with overridden hypervisor parameters
450 0eca8e0c Iustin Pop
    @type bep: dict or None
451 5bbd3f7f Michael Hanselmann
    @param bep: a dictionary with overridden backend parameters
452 1bdcbbab Iustin Pop
    @type osp: dict or None
453 8d8c4eff Michael Hanselmann
    @param osp: a dictionary with overridden os parameters
454 26ba2bd8 Iustin Pop
    @rtype: dict
455 26ba2bd8 Iustin Pop
    @return: the instance dict, with the hvparams filled with the
456 26ba2bd8 Iustin Pop
        cluster defaults
457 26ba2bd8 Iustin Pop

458 26ba2bd8 Iustin Pop
    """
459 26ba2bd8 Iustin Pop
    idict = instance.ToDict()
460 5b442704 Iustin Pop
    cluster = self._cfg.GetClusterInfo()
461 5b442704 Iustin Pop
    idict["hvparams"] = cluster.FillHV(instance)
462 0eca8e0c Iustin Pop
    if hvp is not None:
463 0eca8e0c Iustin Pop
      idict["hvparams"].update(hvp)
464 5b442704 Iustin Pop
    idict["beparams"] = cluster.FillBE(instance)
465 0eca8e0c Iustin Pop
    if bep is not None:
466 0eca8e0c Iustin Pop
      idict["beparams"].update(bep)
467 1bdcbbab Iustin Pop
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
468 1bdcbbab Iustin Pop
    if osp is not None:
469 1bdcbbab Iustin Pop
      idict["osparams"].update(osp)
470 b848ce79 Guido Trotter
    for nic in idict["nics"]:
471 b848ce79 Guido Trotter
      nic['nicparams'] = objects.FillDict(
472 b848ce79 Guido Trotter
        cluster.nicparams[constants.PP_DEFAULT],
473 b848ce79 Guido Trotter
        nic['nicparams'])
474 26ba2bd8 Iustin Pop
    return idict
475 26ba2bd8 Iustin Pop
476 c4de9b7a Michael Hanselmann
  def _InstDictHvpBep(self, (instance, hvp, bep)):
477 c4de9b7a Michael Hanselmann
    """Wrapper for L{_InstDict}.
478 c4de9b7a Michael Hanselmann

479 c4de9b7a Michael Hanselmann
    """
480 c4de9b7a Michael Hanselmann
    return self._InstDict(instance, hvp=hvp, bep=bep)
481 c4de9b7a Michael Hanselmann
482 c4de9b7a Michael Hanselmann
  def _InstDictOsp(self, (instance, osparams)):
483 c4de9b7a Michael Hanselmann
    """Wrapper for L{_InstDict}.
484 c4de9b7a Michael Hanselmann

485 c4de9b7a Michael Hanselmann
    """
486 c4de9b7a Michael Hanselmann
    return self._InstDict(instance, osp=osparams)
487 c4de9b7a Michael Hanselmann
488 200de241 Michael Hanselmann
  def _Call(self, node_list, procedure, timeout, args):
489 200de241 Michael Hanselmann
    """Entry point for automatically generated RPC wrappers.
490 200de241 Michael Hanselmann

491 200de241 Michael Hanselmann
    """
492 160e2921 Iustin Pop
    body = serializer.DumpJson(args, indent=False)
493 9a525d83 Michael Hanselmann
494 4fbe3851 Michael Hanselmann
    return self._proc(node_list, procedure, body, read_timeout=timeout)
495 9a525d83 Michael Hanselmann
496 efc71a02 Michael Hanselmann
  @staticmethod
497 e3ac8406 Andrea Spadaccini
  def _MigrationStatusPostProc(result):
498 e3ac8406 Andrea Spadaccini
    if not result.fail_msg and result.payload is not None:
499 e3ac8406 Andrea Spadaccini
      result.payload = objects.MigrationStatus.FromDict(result.payload)
500 e3ac8406 Andrea Spadaccini
    return result
501 e3ac8406 Andrea Spadaccini
502 e3ac8406 Andrea Spadaccini
  @staticmethod
503 efc71a02 Michael Hanselmann
  def _BlockdevFindPostProc(result):
504 efc71a02 Michael Hanselmann
    if not result.fail_msg and result.payload is not None:
505 efc71a02 Michael Hanselmann
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
506 efc71a02 Michael Hanselmann
    return result
507 efc71a02 Michael Hanselmann
508 efc71a02 Michael Hanselmann
  @staticmethod
509 efc71a02 Michael Hanselmann
  def _BlockdevGetMirrorStatusPostProc(result):
510 efc71a02 Michael Hanselmann
    if not result.fail_msg:
511 efc71a02 Michael Hanselmann
      result.payload = [objects.BlockDevStatus.FromDict(i)
512 efc71a02 Michael Hanselmann
                        for i in result.payload]
513 efc71a02 Michael Hanselmann
    return result
514 efc71a02 Michael Hanselmann
515 efc71a02 Michael Hanselmann
  @staticmethod
516 efc71a02 Michael Hanselmann
  def _BlockdevGetMirrorStatusMultiPostProc(result):
517 efc71a02 Michael Hanselmann
    for nres in result.values():
518 efc71a02 Michael Hanselmann
      if nres.fail_msg:
519 efc71a02 Michael Hanselmann
        continue
520 efc71a02 Michael Hanselmann
521 efc71a02 Michael Hanselmann
      for idx, (success, status) in enumerate(nres.payload):
522 efc71a02 Michael Hanselmann
        if success:
523 efc71a02 Michael Hanselmann
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
524 efc71a02 Michael Hanselmann
525 efc71a02 Michael Hanselmann
    return result
526 efc71a02 Michael Hanselmann
527 efc71a02 Michael Hanselmann
  @staticmethod
528 efc71a02 Michael Hanselmann
  def _OsGetPostProc(result):
529 efc71a02 Michael Hanselmann
    if not result.fail_msg and isinstance(result.payload, dict):
530 efc71a02 Michael Hanselmann
      result.payload = objects.OS.FromDict(result.payload)
531 efc71a02 Michael Hanselmann
    return result
532 efc71a02 Michael Hanselmann
533 efc71a02 Michael Hanselmann
  @staticmethod
534 efc71a02 Michael Hanselmann
  def _PrepareFinalizeExportDisks(snap_disks):
535 efc71a02 Michael Hanselmann
    flat_disks = []
536 efc71a02 Michael Hanselmann
537 efc71a02 Michael Hanselmann
    for disk in snap_disks:
538 efc71a02 Michael Hanselmann
      if isinstance(disk, bool):
539 efc71a02 Michael Hanselmann
        flat_disks.append(disk)
540 efc71a02 Michael Hanselmann
      else:
541 efc71a02 Michael Hanselmann
        flat_disks.append(disk.ToDict())
542 efc71a02 Michael Hanselmann
543 efc71a02 Michael Hanselmann
    return flat_disks
544 efc71a02 Michael Hanselmann
545 efc71a02 Michael Hanselmann
  @staticmethod
546 efc71a02 Michael Hanselmann
  def _ImpExpStatusPostProc(result):
547 efc71a02 Michael Hanselmann
    """Post-processor for import/export status.
548 efc71a02 Michael Hanselmann

549 efc71a02 Michael Hanselmann
    @rtype: Payload containing list of L{objects.ImportExportStatus} instances
550 efc71a02 Michael Hanselmann
    @return: Returns a list of the state of each named import/export or None if
551 efc71a02 Michael Hanselmann
             a status couldn't be retrieved
552 efc71a02 Michael Hanselmann

553 efc71a02 Michael Hanselmann
    """
554 efc71a02 Michael Hanselmann
    if not result.fail_msg:
555 efc71a02 Michael Hanselmann
      decoded = []
556 efc71a02 Michael Hanselmann
557 efc71a02 Michael Hanselmann
      for i in result.payload:
558 efc71a02 Michael Hanselmann
        if i is None:
559 efc71a02 Michael Hanselmann
          decoded.append(None)
560 efc71a02 Michael Hanselmann
          continue
561 efc71a02 Michael Hanselmann
        decoded.append(objects.ImportExportStatus.FromDict(i))
562 efc71a02 Michael Hanselmann
563 efc71a02 Michael Hanselmann
      result.payload = decoded
564 efc71a02 Michael Hanselmann
565 efc71a02 Michael Hanselmann
    return result
566 efc71a02 Michael Hanselmann
567 46c293f0 Michael Hanselmann
  @staticmethod
568 46c293f0 Michael Hanselmann
  def _EncodeImportExportIO(ieio, ieioargs):
569 46c293f0 Michael Hanselmann
    """Encodes import/export I/O information.
570 46c293f0 Michael Hanselmann

571 46c293f0 Michael Hanselmann
    """
572 46c293f0 Michael Hanselmann
    if ieio == constants.IEIO_RAW_DISK:
573 46c293f0 Michael Hanselmann
      assert len(ieioargs) == 1
574 46c293f0 Michael Hanselmann
      return (ieioargs[0].ToDict(), )
575 46c293f0 Michael Hanselmann
576 46c293f0 Michael Hanselmann
    if ieio == constants.IEIO_SCRIPT:
577 46c293f0 Michael Hanselmann
      assert len(ieioargs) == 2
578 46c293f0 Michael Hanselmann
      return (ieioargs[0].ToDict(), ieioargs[1])
579 46c293f0 Michael Hanselmann
580 46c293f0 Michael Hanselmann
    return ieioargs
581 46c293f0 Michael Hanselmann
582 415a7304 Michael Hanselmann
  @staticmethod
583 415a7304 Michael Hanselmann
  def _PrepareFileUpload(filename):
584 415a7304 Michael Hanselmann
    """Loads a file and prepares it for an upload to nodes.
585 6b294c53 Iustin Pop

586 72737a7f Iustin Pop
    """
587 415a7304 Michael Hanselmann
    data = _Compress(utils.ReadFile(filename))
588 415a7304 Michael Hanselmann
    st = os.stat(filename)
589 9a914f7a René Nussbaumer
    getents = runtime.GetEnts()
590 415a7304 Michael Hanselmann
    return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
591 415a7304 Michael Hanselmann
            getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
592 6ddc95ec Michael Hanselmann
593 415a7304 Michael Hanselmann
  #
594 415a7304 Michael Hanselmann
  # Begin RPC calls
595 415a7304 Michael Hanselmann
  #
596 6ddc95ec Michael Hanselmann
597 0436da49 Michael Hanselmann
  def call_test_delay(self, node_list, duration, read_timeout=None):
598 72737a7f Iustin Pop
    """Sleep for a fixed time on given node(s).
599 06009e27 Iustin Pop

600 72737a7f Iustin Pop
    This is a multi-node call.
601 06009e27 Iustin Pop

602 72737a7f Iustin Pop
    """
603 0436da49 Michael Hanselmann
    assert read_timeout is None
604 0436da49 Michael Hanselmann
    return self.call_test_delay(node_list, duration,
605 0436da49 Michael Hanselmann
                                read_timeout=int(duration + 5))
606 5e04ed8b Manuel Franceschini
607 fb1ffbca Michael Hanselmann
608 fb1ffbca Michael Hanselmann
class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
609 fb1ffbca Michael Hanselmann
  """RPC wrappers for job queue.
610 fb1ffbca Michael Hanselmann

611 fb1ffbca Michael Hanselmann
  """
612 fb1ffbca Michael Hanselmann
  _Compress = staticmethod(_Compress)
613 fb1ffbca Michael Hanselmann
614 fb1ffbca Michael Hanselmann
  def __init__(self, context, address_list):
615 fb1ffbca Michael Hanselmann
    """Initializes this class.
616 fb1ffbca Michael Hanselmann

617 fb1ffbca Michael Hanselmann
    """
618 fb1ffbca Michael Hanselmann
    _generated_rpc.RpcClientJobQueue.__init__(self)
619 fb1ffbca Michael Hanselmann
620 fb1ffbca Michael Hanselmann
    if address_list is None:
621 fb1ffbca Michael Hanselmann
      resolver = _SsconfResolver
622 fb1ffbca Michael Hanselmann
    else:
623 fb1ffbca Michael Hanselmann
      # Caller provided an address list
624 fb1ffbca Michael Hanselmann
      resolver = _StaticResolver(address_list)
625 fb1ffbca Michael Hanselmann
626 fb1ffbca Michael Hanselmann
    self._proc = _RpcProcessor(resolver,
627 fb1ffbca Michael Hanselmann
                               netutils.GetDaemonPort(constants.NODED),
628 fb1ffbca Michael Hanselmann
                               lock_monitor_cb=context.glm.AddToLockMonitor)
629 fb1ffbca Michael Hanselmann
630 fb1ffbca Michael Hanselmann
  def _Call(self, node_list, procedure, timeout, args):
631 fb1ffbca Michael Hanselmann
    """Entry point for automatically generated RPC wrappers.
632 fb1ffbca Michael Hanselmann

633 fb1ffbca Michael Hanselmann
    """
634 fb1ffbca Michael Hanselmann
    body = serializer.DumpJson(args, indent=False)
635 fb1ffbca Michael Hanselmann
636 fb1ffbca Michael Hanselmann
    return self._proc(node_list, procedure, body, read_timeout=timeout)
637 db04ce5d Michael Hanselmann
638 db04ce5d Michael Hanselmann
639 db04ce5d Michael Hanselmann
class BootstrapRunner(_generated_rpc.RpcClientBootstrap):
640 db04ce5d Michael Hanselmann
  """RPC wrappers for bootstrapping.
641 db04ce5d Michael Hanselmann

642 db04ce5d Michael Hanselmann
  """
643 db04ce5d Michael Hanselmann
  def __init__(self):
644 db04ce5d Michael Hanselmann
    """Initializes this class.
645 db04ce5d Michael Hanselmann

646 db04ce5d Michael Hanselmann
    """
647 db04ce5d Michael Hanselmann
    _generated_rpc.RpcClientBootstrap.__init__(self)
648 db04ce5d Michael Hanselmann
649 db04ce5d Michael Hanselmann
    self._proc = _RpcProcessor(_SsconfResolver,
650 db04ce5d Michael Hanselmann
                               netutils.GetDaemonPort(constants.NODED))
651 db04ce5d Michael Hanselmann
652 db04ce5d Michael Hanselmann
  def _Call(self, node_list, procedure, timeout, args):
653 db04ce5d Michael Hanselmann
    """Entry point for automatically generated RPC wrappers.
654 db04ce5d Michael Hanselmann

655 db04ce5d Michael Hanselmann
    """
656 db04ce5d Michael Hanselmann
    body = serializer.DumpJson(args, indent=False)
657 db04ce5d Michael Hanselmann
658 db04ce5d Michael Hanselmann
    return self._proc(node_list, procedure, body, read_timeout=timeout)
659 415a7304 Michael Hanselmann
660 415a7304 Michael Hanselmann
661 415a7304 Michael Hanselmann
class ConfigRunner(_generated_rpc.RpcClientConfig):
662 415a7304 Michael Hanselmann
  """RPC wrappers for L{config}.
663 415a7304 Michael Hanselmann

664 415a7304 Michael Hanselmann
  """
665 415a7304 Michael Hanselmann
  _PrepareFileUpload = \
666 415a7304 Michael Hanselmann
    staticmethod(RpcRunner._PrepareFileUpload) # pylint: disable=W0212
667 415a7304 Michael Hanselmann
668 415a7304 Michael Hanselmann
  def __init__(self, address_list):
669 415a7304 Michael Hanselmann
    """Initializes this class.
670 415a7304 Michael Hanselmann

671 415a7304 Michael Hanselmann
    """
672 415a7304 Michael Hanselmann
    _generated_rpc.RpcClientConfig.__init__(self)
673 415a7304 Michael Hanselmann
674 415a7304 Michael Hanselmann
    if address_list is None:
675 415a7304 Michael Hanselmann
      resolver = _SsconfResolver
676 415a7304 Michael Hanselmann
    else:
677 415a7304 Michael Hanselmann
      # Caller provided an address list
678 415a7304 Michael Hanselmann
      resolver = _StaticResolver(address_list)
679 415a7304 Michael Hanselmann
680 415a7304 Michael Hanselmann
    self._proc = _RpcProcessor(resolver,
681 415a7304 Michael Hanselmann
                               netutils.GetDaemonPort(constants.NODED))
682 415a7304 Michael Hanselmann
683 415a7304 Michael Hanselmann
  def _Call(self, node_list, procedure, timeout, args):
684 415a7304 Michael Hanselmann
    """Entry point for automatically generated RPC wrappers.
685 415a7304 Michael Hanselmann

686 415a7304 Michael Hanselmann
    """
687 415a7304 Michael Hanselmann
    body = serializer.DumpJson(args, indent=False)
688 415a7304 Michael Hanselmann
689 415a7304 Michael Hanselmann
    return self._proc(node_list, procedure, body, read_timeout=timeout)