Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ fb1ffbca

History | View | Annotate | Download (23.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# Special module generated at build time
52
from ganeti import _generated_rpc
53

    
54
# pylint has a bug here, doesn't see this import
55
import ganeti.http.client  # pylint: disable=W0611
56

    
57

    
58
# Timeout for connecting to nodes (seconds)
59
_RPC_CONNECT_TIMEOUT = 5
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
# Various time constants for the timeout table
67
_TMO_URGENT = 60 # one minute
68
_TMO_FAST = 5 * 60 # five minutes
69
_TMO_NORMAL = 15 * 60 # 15 minutes
70
_TMO_SLOW = 3600 # one hour
71
_TMO_4HRS = 4 * 3600
72
_TMO_1DAY = 86400
73

    
74
# Timeout table that will be built later by decorators
75
# Guidelines for choosing timeouts:
76
# - call used during watcher: timeout -> 1min, _TMO_URGENT
77
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
78
# - other calls: 15 min, _TMO_NORMAL
79
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
80

    
81
_TIMEOUTS = {
82
}
83

    
84
#: Special value to describe an offline host
85
_OFFLINE = object()
86

    
87

    
88
def Init():
89
  """Initializes the module-global HTTP client manager.
90

91
  Must be called before using any RPC function and while exactly one thread is
92
  running.
93

94
  """
95
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
96
  # one thread running. This check is just a safety measure -- it doesn't
97
  # cover all cases.
98
  assert threading.activeCount() == 1, \
99
         "Found more than one active thread when initializing pycURL"
100

    
101
  logging.info("Using PycURL %s", pycurl.version)
102

    
103
  pycurl.global_init(pycurl.GLOBAL_ALL)
104

    
105

    
106
def Shutdown():
107
  """Stops the module-global HTTP client manager.
108

109
  Must be called before quitting the program and while exactly one thread is
110
  running.
111

112
  """
113
  pycurl.global_cleanup()
114

    
115

    
116
def _ConfigRpcCurl(curl):
117
  noded_cert = str(constants.NODED_CERT_FILE)
118

    
119
  curl.setopt(pycurl.FOLLOWLOCATION, False)
120
  curl.setopt(pycurl.CAINFO, noded_cert)
121
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
122
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
123
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
124
  curl.setopt(pycurl.SSLCERT, noded_cert)
125
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
126
  curl.setopt(pycurl.SSLKEY, noded_cert)
127
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
128

    
129

    
130
def _RpcTimeout(secs):
131
  """Timeout decorator.
132

133
  When applied to a rpc call_* function, it updates the global timeout
134
  table with the given function/timeout.
135

136
  """
137
  def decorator(f):
138
    name = f.__name__
139
    assert name.startswith("call_")
140
    _TIMEOUTS[name[len("call_"):]] = secs
141
    return f
142
  return decorator
143

    
144

    
145
def RunWithRPC(fn):
146
  """RPC-wrapper decorator.
147

148
  When applied to a function, it runs it with the RPC system
149
  initialized, and it shutsdown the system afterwards. This means the
150
  function must be called without RPC being initialized.
151

152
  """
153
  def wrapper(*args, **kwargs):
154
    Init()
155
    try:
156
      return fn(*args, **kwargs)
157
    finally:
158
      Shutdown()
159
  return wrapper
160

    
161

    
162
def _Compress(data):
163
  """Compresses a string for transport over RPC.
164

165
  Small amounts of data are not compressed.
166

167
  @type data: str
168
  @param data: Data
169
  @rtype: tuple
170
  @return: Encoded data to send
171

172
  """
173
  # Small amounts of data are not compressed
174
  if len(data) < 512:
175
    return (constants.RPC_ENCODING_NONE, data)
176

    
177
  # Compress with zlib and encode in base64
178
  return (constants.RPC_ENCODING_ZLIB_BASE64,
179
          base64.b64encode(zlib.compress(data, 3)))
180

    
181

    
182
class RpcResult(object):
183
  """RPC Result class.
184

185
  This class holds an RPC result. It is needed since in multi-node
186
  calls we can't raise an exception just because one one out of many
187
  failed, and therefore we use this class to encapsulate the result.
188

189
  @ivar data: the data payload, for successful results, or None
190
  @ivar call: the name of the RPC call
191
  @ivar node: the name of the node to which we made the call
192
  @ivar offline: whether the operation failed because the node was
193
      offline, as opposed to actual failure; offline=True will always
194
      imply failed=True, in order to allow simpler checking if
195
      the user doesn't care about the exact failure mode
196
  @ivar fail_msg: the error message if the call failed
197

198
  """
199
  def __init__(self, data=None, failed=False, offline=False,
200
               call=None, node=None):
201
    self.offline = offline
202
    self.call = call
203
    self.node = node
204

    
205
    if offline:
206
      self.fail_msg = "Node is marked offline"
207
      self.data = self.payload = None
208
    elif failed:
209
      self.fail_msg = self._EnsureErr(data)
210
      self.data = self.payload = None
211
    else:
212
      self.data = data
213
      if not isinstance(self.data, (tuple, list)):
214
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215
                         type(self.data))
216
        self.payload = None
217
      elif len(data) != 2:
218
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
219
                         "expected 2" % len(self.data))
220
        self.payload = None
221
      elif not self.data[0]:
222
        self.fail_msg = self._EnsureErr(self.data[1])
223
        self.payload = None
224
      else:
225
        # finally success
226
        self.fail_msg = None
227
        self.payload = data[1]
228

    
229
    for attr_name in ["call", "data", "fail_msg",
230
                      "node", "offline", "payload"]:
231
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232

    
233
  @staticmethod
234
  def _EnsureErr(val):
235
    """Helper to ensure we return a 'True' value for error."""
236
    if val:
237
      return val
238
    else:
239
      return "No error information"
240

    
241
  def Raise(self, msg, prereq=False, ecode=None):
242
    """If the result has failed, raise an OpExecError.
243

244
    This is used so that LU code doesn't have to check for each
245
    result, but instead can call this function.
246

247
    """
248
    if not self.fail_msg:
249
      return
250

    
251
    if not msg: # one could pass None for default message
252
      msg = ("Call '%s' to node '%s' has failed: %s" %
253
             (self.call, self.node, self.fail_msg))
254
    else:
255
      msg = "%s: %s" % (msg, self.fail_msg)
256
    if prereq:
257
      ec = errors.OpPrereqError
258
    else:
259
      ec = errors.OpExecError
260
    if ecode is not None:
261
      args = (msg, ecode)
262
    else:
263
      args = (msg, )
264
    raise ec(*args) # pylint: disable=W0142
265

    
266

    
267
def _SsconfResolver(node_list,
268
                    ssc=ssconf.SimpleStore,
269
                    nslookup_fn=netutils.Hostname.GetIP):
270
  """Return addresses for given node names.
271

272
  @type node_list: list
273
  @param node_list: List of node names
274
  @type ssc: class
275
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
276
  @type nslookup_fn: callable
277
  @param nslookup_fn: function use to do NS lookup
278
  @rtype: list of tuple; (string, string)
279
  @return: List of tuples containing node name and IP address
280

281
  """
282
  ss = ssc()
283
  iplist = ss.GetNodePrimaryIPList()
284
  family = ss.GetPrimaryIPFamily()
285
  ipmap = dict(entry.split() for entry in iplist)
286

    
287
  result = []
288
  for node in node_list:
289
    ip = ipmap.get(node)
290
    if ip is None:
291
      ip = nslookup_fn(node, family=family)
292
    result.append((node, ip))
293

    
294
  return result
295

    
296

    
297
class _StaticResolver:
298
  def __init__(self, addresses):
299
    """Initializes this class.
300

301
    """
302
    self._addresses = addresses
303

    
304
  def __call__(self, hosts):
305
    """Returns static addresses for hosts.
306

307
    """
308
    assert len(hosts) == len(self._addresses)
309
    return zip(hosts, self._addresses)
310

    
311

    
312
def _CheckConfigNode(name, node):
313
  """Checks if a node is online.
314

315
  @type name: string
316
  @param name: Node name
317
  @type node: L{objects.Node} or None
318
  @param node: Node object
319

320
  """
321
  if node is None:
322
    # Depend on DNS for name resolution
323
    ip = name
324
  elif node.offline:
325
    ip = _OFFLINE
326
  else:
327
    ip = node.primary_ip
328
  return (name, ip)
329

    
330

    
331
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
332
  """Calculate node addresses using configuration.
333

334
  """
335
  # Special case for single-host lookups
336
  if len(hosts) == 1:
337
    (name, ) = hosts
338
    return [_CheckConfigNode(name, single_node_fn(name))]
339
  else:
340
    all_nodes = all_nodes_fn()
341
    return [_CheckConfigNode(name, all_nodes.get(name, None))
342
            for name in hosts]
343

    
344

    
345
class _RpcProcessor:
346
  def __init__(self, resolver, port, lock_monitor_cb=None):
347
    """Initializes this class.
348

349
    @param resolver: callable accepting a list of hostnames, returning a list
350
      of tuples containing name and IP address (IP address can be the name or
351
      the special value L{_OFFLINE} to mark offline machines)
352
    @type port: int
353
    @param port: TCP port
354
    @param lock_monitor_cb: Callable for registering with lock monitor
355

356
    """
357
    self._resolver = resolver
358
    self._port = port
359
    self._lock_monitor_cb = lock_monitor_cb
360

    
361
  @staticmethod
362
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
363
    """Prepares requests by sorting offline hosts into separate list.
364

365
    """
366
    results = {}
367
    requests = {}
368

    
369
    for (name, ip) in hosts:
370
      if ip is _OFFLINE:
371
        # Node is marked as offline
372
        results[name] = RpcResult(node=name, offline=True, call=procedure)
373
      else:
374
        requests[name] = \
375
          http.client.HttpClientRequest(str(ip), port,
376
                                        http.HTTP_PUT, str("/%s" % procedure),
377
                                        headers=_RPC_CLIENT_HEADERS,
378
                                        post_data=body,
379
                                        read_timeout=read_timeout,
380
                                        nicename="%s/%s" % (name, procedure),
381
                                        curl_config_fn=_ConfigRpcCurl)
382

    
383
    return (results, requests)
384

    
385
  @staticmethod
386
  def _CombineResults(results, requests, procedure):
387
    """Combines pre-computed results for offline hosts with actual call results.
388

389
    """
390
    for name, req in requests.items():
391
      if req.success and req.resp_status_code == http.HTTP_OK:
392
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
393
                                node=name, call=procedure)
394
      else:
395
        # TODO: Better error reporting
396
        if req.error:
397
          msg = req.error
398
        else:
399
          msg = req.resp_body
400

    
401
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
402
        host_result = RpcResult(data=msg, failed=True, node=name,
403
                                call=procedure)
404

    
405
      results[name] = host_result
406

    
407
    return results
408

    
409
  def __call__(self, hosts, procedure, body, read_timeout=None,
410
               _req_process_fn=http.client.ProcessRequests):
411
    """Makes an RPC request to a number of nodes.
412

413
    @type hosts: sequence
414
    @param hosts: Hostnames
415
    @type procedure: string
416
    @param procedure: Request path
417
    @type body: string
418
    @param body: Request body
419
    @type read_timeout: int or None
420
    @param read_timeout: Read timeout for request
421

422
    """
423
    if read_timeout is None:
424
      read_timeout = _TIMEOUTS.get(procedure, None)
425

    
426
    assert read_timeout is not None, \
427
      "Missing RPC read timeout for procedure '%s'" % procedure
428

    
429
    (results, requests) = \
430
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
431
                            str(body), read_timeout)
432

    
433
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
434

    
435
    assert not frozenset(results).intersection(requests)
436

    
437
    return self._CombineResults(results, requests, procedure)
438

    
439

    
440
class RpcRunner(_generated_rpc.RpcClientDefault):
441
  """RPC runner class.
442

443
  """
444
  def __init__(self, context):
445
    """Initialized the RPC runner.
446

447
    @type context: C{masterd.GanetiContext}
448
    @param context: Ganeti context
449

450
    """
451
    _generated_rpc.RpcClientDefault.__init__(self)
452

    
453
    self._cfg = context.cfg
454
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
455
                                              self._cfg.GetNodeInfo,
456
                                              self._cfg.GetAllNodesInfo),
457
                               netutils.GetDaemonPort(constants.NODED),
458
                               lock_monitor_cb=context.glm.AddToLockMonitor)
459

    
460
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
461
    """Convert the given instance to a dict.
462

463
    This is done via the instance's ToDict() method and additionally
464
    we fill the hvparams with the cluster defaults.
465

466
    @type instance: L{objects.Instance}
467
    @param instance: an Instance object
468
    @type hvp: dict or None
469
    @param hvp: a dictionary with overridden hypervisor parameters
470
    @type bep: dict or None
471
    @param bep: a dictionary with overridden backend parameters
472
    @type osp: dict or None
473
    @param osp: a dictionary with overridden os parameters
474
    @rtype: dict
475
    @return: the instance dict, with the hvparams filled with the
476
        cluster defaults
477

478
    """
479
    idict = instance.ToDict()
480
    cluster = self._cfg.GetClusterInfo()
481
    idict["hvparams"] = cluster.FillHV(instance)
482
    if hvp is not None:
483
      idict["hvparams"].update(hvp)
484
    idict["beparams"] = cluster.FillBE(instance)
485
    if bep is not None:
486
      idict["beparams"].update(bep)
487
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
488
    if osp is not None:
489
      idict["osparams"].update(osp)
490
    for nic in idict["nics"]:
491
      nic['nicparams'] = objects.FillDict(
492
        cluster.nicparams[constants.PP_DEFAULT],
493
        nic['nicparams'])
494
    return idict
495

    
496
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
497
    """Helper for making a multi-node call
498

499
    """
500
    body = serializer.DumpJson(args, indent=False)
501
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
502

    
503
  def _Call(self, node_list, procedure, timeout, args):
504
    """Entry point for automatically generated RPC wrappers.
505

506
    """
507
    return self._MultiNodeCall(node_list, procedure, args, read_timeout=timeout)
508

    
509
  @staticmethod
510
  def _StaticMultiNodeCall(node_list, procedure, args,
511
                           address_list=None, read_timeout=None):
512
    """Helper for making a multi-node static call
513

514
    """
515
    body = serializer.DumpJson(args, indent=False)
516

    
517
    if address_list is None:
518
      resolver = _SsconfResolver
519
    else:
520
      # Caller provided an address list
521
      resolver = _StaticResolver(address_list)
522

    
523
    proc = _RpcProcessor(resolver,
524
                         netutils.GetDaemonPort(constants.NODED))
525
    return proc(node_list, procedure, body, read_timeout=read_timeout)
526

    
527
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
528
    """Helper for making a single-node call
529

530
    """
531
    body = serializer.DumpJson(args, indent=False)
532
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
533

    
534
  @classmethod
535
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
536
    """Helper for making a single-node static call
537

538
    """
539
    body = serializer.DumpJson(args, indent=False)
540
    proc = _RpcProcessor(_SsconfResolver,
541
                         netutils.GetDaemonPort(constants.NODED))
542
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
543

    
544
  @staticmethod
545
  def _BlockdevFindPostProc(result):
546
    if not result.fail_msg and result.payload is not None:
547
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
548
    return result
549

    
550
  @staticmethod
551
  def _BlockdevGetMirrorStatusPostProc(result):
552
    if not result.fail_msg:
553
      result.payload = [objects.BlockDevStatus.FromDict(i)
554
                        for i in result.payload]
555
    return result
556

    
557
  @staticmethod
558
  def _BlockdevGetMirrorStatusMultiPostProc(result):
559
    for nres in result.values():
560
      if nres.fail_msg:
561
        continue
562

    
563
      for idx, (success, status) in enumerate(nres.payload):
564
        if success:
565
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
566

    
567
    return result
568

    
569
  @staticmethod
570
  def _OsGetPostProc(result):
571
    if not result.fail_msg and isinstance(result.payload, dict):
572
      result.payload = objects.OS.FromDict(result.payload)
573
    return result
574

    
575
  @staticmethod
576
  def _PrepareFinalizeExportDisks(snap_disks):
577
    flat_disks = []
578

    
579
    for disk in snap_disks:
580
      if isinstance(disk, bool):
581
        flat_disks.append(disk)
582
      else:
583
        flat_disks.append(disk.ToDict())
584

    
585
    return flat_disks
586

    
587
  @staticmethod
588
  def _ImpExpStatusPostProc(result):
589
    """Post-processor for import/export status.
590

591
    @rtype: Payload containing list of L{objects.ImportExportStatus} instances
592
    @return: Returns a list of the state of each named import/export or None if
593
             a status couldn't be retrieved
594

595
    """
596
    if not result.fail_msg:
597
      decoded = []
598

    
599
      for i in result.payload:
600
        if i is None:
601
          decoded.append(None)
602
          continue
603
        decoded.append(objects.ImportExportStatus.FromDict(i))
604

    
605
      result.payload = decoded
606

    
607
    return result
608

    
609
  @staticmethod
610
  def _EncodeImportExportIO(ieio, ieioargs):
611
    """Encodes import/export I/O information.
612

613
    """
614
    if ieio == constants.IEIO_RAW_DISK:
615
      assert len(ieioargs) == 1
616
      return (ieioargs[0].ToDict(), )
617

    
618
    if ieio == constants.IEIO_SCRIPT:
619
      assert len(ieioargs) == 2
620
      return (ieioargs[0].ToDict(), ieioargs[1])
621

    
622
    return ieioargs
623

    
624
  #
625
  # Begin RPC calls
626
  #
627

    
628
  @_RpcTimeout(_TMO_NORMAL)
629
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
630
    """Starts an instance.
631

632
    This is a single-node call.
633

634
    """
635
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
636
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
637

    
638
  @_RpcTimeout(_TMO_1DAY)
639
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
640
    """Installs an OS on the given instance.
641

642
    This is a single-node call.
643

644
    """
645
    return self._SingleNodeCall(node, "instance_os_add",
646
                                [self._InstDict(inst, osp=osparams),
647
                                 reinstall, debug])
648

    
649
  @classmethod
650
  @_RpcTimeout(_TMO_FAST)
651
  def call_node_start_master_daemons(cls, node, no_voting):
652
    """Starts master daemons on a node.
653

654
    This is a single-node call.
655

656
    """
657
    return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
658
                                     [no_voting])
659

    
660
  @classmethod
661
  @_RpcTimeout(_TMO_FAST)
662
  def call_node_activate_master_ip(cls, node):
663
    """Activates master IP on a node.
664

665
    This is a single-node call.
666

667
    """
668
    return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
669

    
670
  @classmethod
671
  @_RpcTimeout(_TMO_FAST)
672
  def call_node_stop_master(cls, node):
673
    """Deactivates master IP and stops master daemons on a node.
674

675
    This is a single-node call.
676

677
    """
678
    return cls._StaticSingleNodeCall(node, "node_stop_master", [])
679

    
680
  @classmethod
681
  @_RpcTimeout(_TMO_FAST)
682
  def call_node_deactivate_master_ip(cls, node):
683
    """Deactivates master IP on a node.
684

685
    This is a single-node call.
686

687
    """
688
    return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
689

    
690
  @classmethod
691
  @_RpcTimeout(_TMO_FAST)
692
  def call_node_change_master_netmask(cls, node, netmask):
693
    """Change master IP netmask.
694

695
    This is a single-node call.
696

697
    """
698
    return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
699
                  [netmask])
700

    
701
  @classmethod
702
  @_RpcTimeout(_TMO_URGENT)
703
  def call_master_info(cls, node_list):
704
    """Query master info.
705

706
    This is a multi-node call.
707

708
    """
709
    # TODO: should this method query down nodes?
710
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
711

    
712
  @classmethod
713
  @_RpcTimeout(_TMO_URGENT)
714
  def call_version(cls, node_list):
715
    """Query node version.
716

717
    This is a multi-node call.
718

719
    """
720
    return cls._StaticMultiNodeCall(node_list, "version", [])
721

    
722
  @classmethod
723
  @_RpcTimeout(_TMO_NORMAL)
724
  def call_upload_file(cls, node_list, file_name, address_list=None):
725
    """Upload a file.
726

727
    The node will refuse the operation in case the file is not on the
728
    approved file list.
729

730
    This is a multi-node call.
731

732
    @type node_list: list
733
    @param node_list: the list of node names to upload to
734
    @type file_name: str
735
    @param file_name: the filename to upload
736
    @type address_list: list or None
737
    @keyword address_list: an optional list of node addresses, in order
738
        to optimize the RPC speed
739

740
    """
741
    file_contents = utils.ReadFile(file_name)
742
    data = _Compress(file_contents)
743
    st = os.stat(file_name)
744
    getents = runtime.GetEnts()
745
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
746
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
747
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
748
                                    address_list=address_list)
749

    
750
  @classmethod
751
  @_RpcTimeout(_TMO_NORMAL)
752
  def call_write_ssconf_files(cls, node_list, values):
753
    """Write ssconf files.
754

755
    This is a multi-node call.
756

757
    """
758
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
759

    
760
  @classmethod
761
  @_RpcTimeout(_TMO_NORMAL)
762
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
763
    """Requests a node to clean the cluster information it has.
764

765
    This will remove the configuration information from the ganeti data
766
    dir.
767

768
    This is a single-node call.
769

770
    """
771
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
772
                                     [modify_ssh_setup])
773

    
774
  def call_test_delay(self, node_list, duration, read_timeout=None):
775
    """Sleep for a fixed time on given node(s).
776

777
    This is a multi-node call.
778

779
    """
780
    assert read_timeout is None
781
    return self.call_test_delay(node_list, duration,
782
                                read_timeout=int(duration + 5))
783

    
784
  @_RpcTimeout(_TMO_NORMAL)
785
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
786
    """Validate the hypervisor params.
787

788
    This is a multi-node call.
789

790
    @type node_list: list
791
    @param node_list: the list of nodes to query
792
    @type hvname: string
793
    @param hvname: the hypervisor name
794
    @type hvparams: dict
795
    @param hvparams: the hypervisor parameters to be validated
796

797
    """
798
    cluster = self._cfg.GetClusterInfo()
799
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
800
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
801
                               [hvname, hv_full])
802

    
803

    
804
class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
805
  """RPC wrappers for job queue.
806

807
  """
808
  _Compress = staticmethod(_Compress)
809

    
810
  def __init__(self, context, address_list):
811
    """Initializes this class.
812

813
    """
814
    _generated_rpc.RpcClientJobQueue.__init__(self)
815

    
816
    if address_list is None:
817
      resolver = _SsconfResolver
818
    else:
819
      # Caller provided an address list
820
      resolver = _StaticResolver(address_list)
821

    
822
    self._proc = _RpcProcessor(resolver,
823
                               netutils.GetDaemonPort(constants.NODED),
824
                               lock_monitor_cb=context.glm.AddToLockMonitor)
825

    
826
  def _Call(self, node_list, procedure, timeout, args):
827
    """Entry point for automatically generated RPC wrappers.
828

829
    """
830
    body = serializer.DumpJson(args, indent=False)
831

    
832
    return self._proc(node_list, procedure, body, read_timeout=timeout)