Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ e0036155

History | View | Annotate | Download (38.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
# pylint has a bug here, doesn't see this import
46
import ganeti.http.client  # pylint: disable-msg=W0611
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager # pylint: disable-msg=W0603
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  http.InitSsl()
64

    
65
  _http_manager = http.client.HttpClientManager()
66

    
67

    
68
def Shutdown():
69
  """Stops the module-global HTTP client manager.
70

71
  Must be called before quitting the program.
72

73
  """
74
  global _http_manager # pylint: disable-msg=W0603
75

    
76
  if _http_manager:
77
    _http_manager.Shutdown()
78
    _http_manager = None
79

    
80

    
81
class RpcResult(object):
82
  """RPC Result class.
83

84
  This class holds an RPC result. It is needed since in multi-node
85
  calls we can't raise an exception just because one one out of many
86
  failed, and therefore we use this class to encapsulate the result.
87

88
  @ivar data: the data payload, for successful results, or None
89
  @ivar call: the name of the RPC call
90
  @ivar node: the name of the node to which we made the call
91
  @ivar offline: whether the operation failed because the node was
92
      offline, as opposed to actual failure; offline=True will always
93
      imply failed=True, in order to allow simpler checking if
94
      the user doesn't care about the exact failure mode
95
  @ivar fail_msg: the error message if the call failed
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.offline = offline
101
    self.call = call
102
    self.node = node
103

    
104
    if offline:
105
      self.fail_msg = "Node is marked offline"
106
      self.data = self.payload = None
107
    elif failed:
108
      self.fail_msg = self._EnsureErr(data)
109
      self.data = self.payload = None
110
    else:
111
      self.data = data
112
      if not isinstance(self.data, (tuple, list)):
113
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
114
                         type(self.data))
115
        self.payload = None
116
      elif len(data) != 2:
117
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
118
                         "expected 2" % len(self.data))
119
        self.payload = None
120
      elif not self.data[0]:
121
        self.fail_msg = self._EnsureErr(self.data[1])
122
        self.payload = None
123
      else:
124
        # finally success
125
        self.fail_msg = None
126
        self.payload = data[1]
127

    
128
    assert hasattr(self, "call")
129
    assert hasattr(self, "data")
130
    assert hasattr(self, "fail_msg")
131
    assert hasattr(self, "node")
132
    assert hasattr(self, "offline")
133
    assert hasattr(self, "payload")
134

    
135
  @staticmethod
136
  def _EnsureErr(val):
137
    """Helper to ensure we return a 'True' value for error."""
138
    if val:
139
      return val
140
    else:
141
      return "No error information"
142

    
143
  def Raise(self, msg, prereq=False, ecode=None):
144
    """If the result has failed, raise an OpExecError.
145

146
    This is used so that LU code doesn't have to check for each
147
    result, but instead can call this function.
148

149
    """
150
    if not self.fail_msg:
151
      return
152

    
153
    if not msg: # one could pass None for default message
154
      msg = ("Call '%s' to node '%s' has failed: %s" %
155
             (self.call, self.node, self.fail_msg))
156
    else:
157
      msg = "%s: %s" % (msg, self.fail_msg)
158
    if prereq:
159
      ec = errors.OpPrereqError
160
    else:
161
      ec = errors.OpExecError
162
    if ecode is not None:
163
      args = (msg, prereq)
164
    else:
165
      args = (msg, )
166
    raise ec(*args) # pylint: disable-msg=W0142
167

    
168

    
169
class Client:
170
  """RPC Client class.
171

172
  This class, given a (remote) method name, a list of parameters and a
173
  list of nodes, will contact (in parallel) all nodes, and return a
174
  dict of results (key: node name, value: result).
175

176
  One current bug is that generic failure is still signaled by
177
  'False' result, which is not good. This overloading of values can
178
  cause bugs.
179

180
  """
181
  def __init__(self, procedure, body, port):
182
    self.procedure = procedure
183
    self.body = body
184
    self.port = port
185
    self.nc = {}
186

    
187
    self._ssl_params = \
188
      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189
                         ssl_cert_path=constants.NODED_CERT_FILE)
190

    
191
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
192
    """Add a list of nodes to the target nodes.
193

194
    @type node_list: list
195
    @param node_list: the list of node names to connect
196
    @type address_list: list or None
197
    @keyword address_list: either None or a list with node addresses,
198
        which must have the same length as the node list
199
    @type read_timeout: int
200
    @param read_timeout: overwrites the default read timeout for the
201
        given operation
202

203
    """
204
    if address_list is None:
205
      address_list = [None for _ in node_list]
206
    else:
207
      assert len(node_list) == len(address_list), \
208
             "Name and address lists should have the same length"
209
    for node, address in zip(node_list, address_list):
210
      self.ConnectNode(node, address, read_timeout=read_timeout)
211

    
212
  def ConnectNode(self, name, address=None, read_timeout=None):
213
    """Add a node to the target list.
214

215
    @type name: str
216
    @param name: the node name
217
    @type address: str
218
    @keyword address: the node address, if known
219

220
    """
221
    if address is None:
222
      address = name
223

    
224
    self.nc[name] = \
225
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
226
                                    "/%s" % self.procedure,
227
                                    post_data=self.body,
228
                                    ssl_params=self._ssl_params,
229
                                    ssl_verify_peer=True,
230
                                    read_timeout=read_timeout)
231

    
232
  def GetResults(self):
233
    """Call nodes and return results.
234

235
    @rtype: list
236
    @return: List of RPC results
237

238
    """
239
    assert _http_manager, "RPC module not initialized"
240

    
241
    _http_manager.ExecRequests(self.nc.values())
242

    
243
    results = {}
244

    
245
    for name, req in self.nc.iteritems():
246
      if req.success and req.resp_status_code == http.HTTP_OK:
247
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
248
                                  node=name, call=self.procedure)
249
        continue
250

    
251
      # TODO: Better error reporting
252
      if req.error:
253
        msg = req.error
254
      else:
255
        msg = req.resp_body
256

    
257
      logging.error("RPC error in %s from node %s: %s",
258
                    self.procedure, name, msg)
259
      results[name] = RpcResult(data=msg, failed=True, node=name,
260
                                call=self.procedure)
261

    
262
    return results
263

    
264

    
265
def _EncodeImportExportIO(ieio, ieioargs):
266
  """Encodes import/export I/O information.
267

268
  """
269
  if ieio == constants.IEIO_RAW_DISK:
270
    assert len(ieioargs) == 1
271
    return (ieioargs[0].ToDict(), )
272

    
273
  if ieio == constants.IEIO_SCRIPT:
274
    assert len(ieioargs) == 2
275
    return (ieioargs[0].ToDict(), ieioargs[1])
276

    
277
  return ieioargs
278

    
279

    
280
class RpcRunner(object):
281
  """RPC runner class"""
282

    
283
  def __init__(self, cfg):
284
    """Initialized the rpc runner.
285

286
    @type cfg:  C{config.ConfigWriter}
287
    @param cfg: the configuration object that will be used to get data
288
                about the cluster
289

290
    """
291
    self._cfg = cfg
292
    self.port = utils.GetDaemonPort(constants.NODED)
293

    
294
  def _InstDict(self, instance, hvp=None, bep=None):
295
    """Convert the given instance to a dict.
296

297
    This is done via the instance's ToDict() method and additionally
298
    we fill the hvparams with the cluster defaults.
299

300
    @type instance: L{objects.Instance}
301
    @param instance: an Instance object
302
    @type hvp: dict or None
303
    @param hvp: a dictionary with overridden hypervisor parameters
304
    @type bep: dict or None
305
    @param bep: a dictionary with overridden backend parameters
306
    @rtype: dict
307
    @return: the instance dict, with the hvparams filled with the
308
        cluster defaults
309

310
    """
311
    idict = instance.ToDict()
312
    cluster = self._cfg.GetClusterInfo()
313
    idict["hvparams"] = cluster.FillHV(instance)
314
    if hvp is not None:
315
      idict["hvparams"].update(hvp)
316
    idict["beparams"] = cluster.FillBE(instance)
317
    if bep is not None:
318
      idict["beparams"].update(bep)
319
    for nic in idict["nics"]:
320
      nic['nicparams'] = objects.FillDict(
321
        cluster.nicparams[constants.PP_DEFAULT],
322
        nic['nicparams'])
323
    return idict
324

    
325
  def _ConnectList(self, client, node_list, call, read_timeout=None):
326
    """Helper for computing node addresses.
327

328
    @type client: L{ganeti.rpc.Client}
329
    @param client: a C{Client} instance
330
    @type node_list: list
331
    @param node_list: the node list we should connect
332
    @type call: string
333
    @param call: the name of the remote procedure call, for filling in
334
        correctly any eventual offline nodes' results
335
    @type read_timeout: int
336
    @param read_timeout: overwrites the default read timeout for the
337
        given operation
338

339
    """
340
    all_nodes = self._cfg.GetAllNodesInfo()
341
    name_list = []
342
    addr_list = []
343
    skip_dict = {}
344
    for node in node_list:
345
      if node in all_nodes:
346
        if all_nodes[node].offline:
347
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
348
          continue
349
        val = all_nodes[node].primary_ip
350
      else:
351
        val = None
352
      addr_list.append(val)
353
      name_list.append(node)
354
    if name_list:
355
      client.ConnectList(name_list, address_list=addr_list,
356
                         read_timeout=read_timeout)
357
    return skip_dict
358

    
359
  def _ConnectNode(self, client, node, call, read_timeout=None):
360
    """Helper for computing one node's address.
361

362
    @type client: L{ganeti.rpc.Client}
363
    @param client: a C{Client} instance
364
    @type node: str
365
    @param node: the node we should connect
366
    @type call: string
367
    @param call: the name of the remote procedure call, for filling in
368
        correctly any eventual offline nodes' results
369
    @type read_timeout: int
370
    @param read_timeout: overwrites the default read timeout for the
371
        given operation
372

373
    """
374
    node_info = self._cfg.GetNodeInfo(node)
375
    if node_info is not None:
376
      if node_info.offline:
377
        return RpcResult(node=node, offline=True, call=call)
378
      addr = node_info.primary_ip
379
    else:
380
      addr = None
381
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
382

    
383
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
384
    """Helper for making a multi-node call
385

386
    """
387
    body = serializer.DumpJson(args, indent=False)
388
    c = Client(procedure, body, self.port)
389
    skip_dict = self._ConnectList(c, node_list, procedure,
390
                                  read_timeout=read_timeout)
391
    skip_dict.update(c.GetResults())
392
    return skip_dict
393

    
394
  @classmethod
395
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
396
                           address_list=None, read_timeout=None):
397
    """Helper for making a multi-node static call
398

399
    """
400
    body = serializer.DumpJson(args, indent=False)
401
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
402
    c.ConnectList(node_list, address_list=address_list,
403
                  read_timeout=read_timeout)
404
    return c.GetResults()
405

    
406
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
407
    """Helper for making a single-node call
408

409
    """
410
    body = serializer.DumpJson(args, indent=False)
411
    c = Client(procedure, body, self.port)
412
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
413
    if result is None:
414
      # we did connect, node is not offline
415
      result = c.GetResults()[node]
416
    return result
417

    
418
  @classmethod
419
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
420
    """Helper for making a single-node static call
421

422
    """
423
    body = serializer.DumpJson(args, indent=False)
424
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
425
    c.ConnectNode(node, read_timeout=read_timeout)
426
    return c.GetResults()[node]
427

    
428
  @staticmethod
429
  def _Compress(data):
430
    """Compresses a string for transport over RPC.
431

432
    Small amounts of data are not compressed.
433

434
    @type data: str
435
    @param data: Data
436
    @rtype: tuple
437
    @return: Encoded data to send
438

439
    """
440
    # Small amounts of data are not compressed
441
    if len(data) < 512:
442
      return (constants.RPC_ENCODING_NONE, data)
443

    
444
    # Compress with zlib and encode in base64
445
    return (constants.RPC_ENCODING_ZLIB_BASE64,
446
            base64.b64encode(zlib.compress(data, 3)))
447

    
448
  #
449
  # Begin RPC calls
450
  #
451

    
452
  def call_lv_list(self, node_list, vg_name):
453
    """Gets the logical volumes present in a given volume group.
454

455
    This is a multi-node call.
456

457
    """
458
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
459

    
460
  def call_vg_list(self, node_list):
461
    """Gets the volume group list.
462

463
    This is a multi-node call.
464

465
    """
466
    return self._MultiNodeCall(node_list, "vg_list", [])
467

    
468
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
469
    """Get list of storage units.
470

471
    This is a multi-node call.
472

473
    """
474
    return self._MultiNodeCall(node_list, "storage_list",
475
                               [su_name, su_args, name, fields])
476

    
477
  def call_storage_modify(self, node, su_name, su_args, name, changes):
478
    """Modify a storage unit.
479

480
    This is a single-node call.
481

482
    """
483
    return self._SingleNodeCall(node, "storage_modify",
484
                                [su_name, su_args, name, changes])
485

    
486
  def call_storage_execute(self, node, su_name, su_args, name, op):
487
    """Executes an operation on a storage unit.
488

489
    This is a single-node call.
490

491
    """
492
    return self._SingleNodeCall(node, "storage_execute",
493
                                [su_name, su_args, name, op])
494

    
495
  def call_bridges_exist(self, node, bridges_list):
496
    """Checks if a node has all the bridges given.
497

498
    This method checks if all bridges given in the bridges_list are
499
    present on the remote node, so that an instance that uses interfaces
500
    on those bridges can be started.
501

502
    This is a single-node call.
503

504
    """
505
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
506

    
507
  def call_instance_start(self, node, instance, hvp, bep):
508
    """Starts an instance.
509

510
    This is a single-node call.
511

512
    """
513
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
514
    return self._SingleNodeCall(node, "instance_start", [idict])
515

    
516
  def call_instance_shutdown(self, node, instance, timeout):
517
    """Stops an instance.
518

519
    This is a single-node call.
520

521
    """
522
    return self._SingleNodeCall(node, "instance_shutdown",
523
                                [self._InstDict(instance), timeout])
524

    
525
  def call_migration_info(self, node, instance):
526
    """Gather the information necessary to prepare an instance migration.
527

528
    This is a single-node call.
529

530
    @type node: string
531
    @param node: the node on which the instance is currently running
532
    @type instance: C{objects.Instance}
533
    @param instance: the instance definition
534

535
    """
536
    return self._SingleNodeCall(node, "migration_info",
537
                                [self._InstDict(instance)])
538

    
539
  def call_accept_instance(self, node, instance, info, target):
540
    """Prepare a node to accept an instance.
541

542
    This is a single-node call.
543

544
    @type node: string
545
    @param node: the target node for the migration
546
    @type instance: C{objects.Instance}
547
    @param instance: the instance definition
548
    @type info: opaque/hypervisor specific (string/data)
549
    @param info: result for the call_migration_info call
550
    @type target: string
551
    @param target: target hostname (usually ip address) (on the node itself)
552

553
    """
554
    return self._SingleNodeCall(node, "accept_instance",
555
                                [self._InstDict(instance), info, target])
556

    
557
  def call_finalize_migration(self, node, instance, info, success):
558
    """Finalize any target-node migration specific operation.
559

560
    This is called both in case of a successful migration and in case of error
561
    (in which case it should abort the migration).
562

563
    This is a single-node call.
564

565
    @type node: string
566
    @param node: the target node for the migration
567
    @type instance: C{objects.Instance}
568
    @param instance: the instance definition
569
    @type info: opaque/hypervisor specific (string/data)
570
    @param info: result for the call_migration_info call
571
    @type success: boolean
572
    @param success: whether the migration was a success or a failure
573

574
    """
575
    return self._SingleNodeCall(node, "finalize_migration",
576
                                [self._InstDict(instance), info, success])
577

    
578
  def call_instance_migrate(self, node, instance, target, live):
579
    """Migrate an instance.
580

581
    This is a single-node call.
582

583
    @type node: string
584
    @param node: the node on which the instance is currently running
585
    @type instance: C{objects.Instance}
586
    @param instance: the instance definition
587
    @type target: string
588
    @param target: the target node name
589
    @type live: boolean
590
    @param live: whether the migration should be done live or not (the
591
        interpretation of this parameter is left to the hypervisor)
592

593
    """
594
    return self._SingleNodeCall(node, "instance_migrate",
595
                                [self._InstDict(instance), target, live])
596

    
597
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
598
    """Reboots an instance.
599

600
    This is a single-node call.
601

602
    """
603
    return self._SingleNodeCall(node, "instance_reboot",
604
                                [self._InstDict(inst), reboot_type,
605
                                 shutdown_timeout])
606

    
607
  def call_instance_os_add(self, node, inst, reinstall, debug):
608
    """Installs an OS on the given instance.
609

610
    This is a single-node call.
611

612
    """
613
    return self._SingleNodeCall(node, "instance_os_add",
614
                                [self._InstDict(inst), reinstall, debug])
615

    
616
  def call_instance_run_rename(self, node, inst, old_name, debug):
617
    """Run the OS rename script for an instance.
618

619
    This is a single-node call.
620

621
    """
622
    return self._SingleNodeCall(node, "instance_run_rename",
623
                                [self._InstDict(inst), old_name, debug])
624

    
625
  def call_instance_info(self, node, instance, hname):
626
    """Returns information about a single instance.
627

628
    This is a single-node call.
629

630
    @type node: list
631
    @param node: the list of nodes to query
632
    @type instance: string
633
    @param instance: the instance name
634
    @type hname: string
635
    @param hname: the hypervisor type of the instance
636

637
    """
638
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
639

    
640
  def call_instance_migratable(self, node, instance):
641
    """Checks whether the given instance can be migrated.
642

643
    This is a single-node call.
644

645
    @param node: the node to query
646
    @type instance: L{objects.Instance}
647
    @param instance: the instance to check
648

649

650
    """
651
    return self._SingleNodeCall(node, "instance_migratable",
652
                                [self._InstDict(instance)])
653

    
654
  def call_all_instances_info(self, node_list, hypervisor_list):
655
    """Returns information about all instances on the given nodes.
656

657
    This is a multi-node call.
658

659
    @type node_list: list
660
    @param node_list: the list of nodes to query
661
    @type hypervisor_list: list
662
    @param hypervisor_list: the hypervisors to query for instances
663

664
    """
665
    return self._MultiNodeCall(node_list, "all_instances_info",
666
                               [hypervisor_list])
667

    
668
  def call_instance_list(self, node_list, hypervisor_list):
669
    """Returns the list of running instances on a given node.
670

671
    This is a multi-node call.
672

673
    @type node_list: list
674
    @param node_list: the list of nodes to query
675
    @type hypervisor_list: list
676
    @param hypervisor_list: the hypervisors to query for instances
677

678
    """
679
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
680

    
681
  def call_node_tcp_ping(self, node, source, target, port, timeout,
682
                         live_port_needed):
683
    """Do a TcpPing on the remote node
684

685
    This is a single-node call.
686

687
    """
688
    return self._SingleNodeCall(node, "node_tcp_ping",
689
                                [source, target, port, timeout,
690
                                 live_port_needed])
691

    
692
  def call_node_has_ip_address(self, node, address):
693
    """Checks if a node has the given IP address.
694

695
    This is a single-node call.
696

697
    """
698
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
699

    
700
  def call_node_info(self, node_list, vg_name, hypervisor_type):
701
    """Return node information.
702

703
    This will return memory information and volume group size and free
704
    space.
705

706
    This is a multi-node call.
707

708
    @type node_list: list
709
    @param node_list: the list of nodes to query
710
    @type vg_name: C{string}
711
    @param vg_name: the name of the volume group to ask for disk space
712
        information
713
    @type hypervisor_type: C{str}
714
    @param hypervisor_type: the name of the hypervisor to ask for
715
        memory information
716

717
    """
718
    return self._MultiNodeCall(node_list, "node_info",
719
                               [vg_name, hypervisor_type])
720

    
721
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
722
    """Add a node to the cluster.
723

724
    This is a single-node call.
725

726
    """
727
    return self._SingleNodeCall(node, "node_add",
728
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
729

    
730
  def call_node_verify(self, node_list, checkdict, cluster_name):
731
    """Request verification of given parameters.
732

733
    This is a multi-node call.
734

735
    """
736
    return self._MultiNodeCall(node_list, "node_verify",
737
                               [checkdict, cluster_name])
738

    
739
  @classmethod
740
  def call_node_start_master(cls, node, start_daemons, no_voting):
741
    """Tells a node to activate itself as a master.
742

743
    This is a single-node call.
744

745
    """
746
    return cls._StaticSingleNodeCall(node, "node_start_master",
747
                                     [start_daemons, no_voting])
748

    
749
  @classmethod
750
  def call_node_stop_master(cls, node, stop_daemons):
751
    """Tells a node to demote itself from master status.
752

753
    This is a single-node call.
754

755
    """
756
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
757

    
758
  @classmethod
759
  def call_master_info(cls, node_list):
760
    """Query master info.
761

762
    This is a multi-node call.
763

764
    """
765
    # TODO: should this method query down nodes?
766
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
767

    
768
  @classmethod
769
  def call_version(cls, node_list):
770
    """Query node version.
771

772
    This is a multi-node call.
773

774
    """
775
    return cls._StaticMultiNodeCall(node_list, "version", [])
776

    
777
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
778
    """Request creation of a given block device.
779

780
    This is a single-node call.
781

782
    """
783
    return self._SingleNodeCall(node, "blockdev_create",
784
                                [bdev.ToDict(), size, owner, on_primary, info])
785

    
786
  def call_blockdev_remove(self, node, bdev):
787
    """Request removal of a given block device.
788

789
    This is a single-node call.
790

791
    """
792
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
793

    
794
  def call_blockdev_rename(self, node, devlist):
795
    """Request rename of the given block devices.
796

797
    This is a single-node call.
798

799
    """
800
    return self._SingleNodeCall(node, "blockdev_rename",
801
                                [(d.ToDict(), uid) for d, uid in devlist])
802

    
803
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
804
    """Request assembling of a given block device.
805

806
    This is a single-node call.
807

808
    """
809
    return self._SingleNodeCall(node, "blockdev_assemble",
810
                                [disk.ToDict(), owner, on_primary])
811

    
812
  def call_blockdev_shutdown(self, node, disk):
813
    """Request shutdown of a given block device.
814

815
    This is a single-node call.
816

817
    """
818
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
819

    
820
  def call_blockdev_addchildren(self, node, bdev, ndevs):
821
    """Request adding a list of children to a (mirroring) device.
822

823
    This is a single-node call.
824

825
    """
826
    return self._SingleNodeCall(node, "blockdev_addchildren",
827
                                [bdev.ToDict(),
828
                                 [disk.ToDict() for disk in ndevs]])
829

    
830
  def call_blockdev_removechildren(self, node, bdev, ndevs):
831
    """Request removing a list of children from a (mirroring) device.
832

833
    This is a single-node call.
834

835
    """
836
    return self._SingleNodeCall(node, "blockdev_removechildren",
837
                                [bdev.ToDict(),
838
                                 [disk.ToDict() for disk in ndevs]])
839

    
840
  def call_blockdev_getmirrorstatus(self, node, disks):
841
    """Request status of a (mirroring) device.
842

843
    This is a single-node call.
844

845
    """
846
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
847
                                  [dsk.ToDict() for dsk in disks])
848
    if not result.fail_msg:
849
      result.payload = [objects.BlockDevStatus.FromDict(i)
850
                        for i in result.payload]
851
    return result
852

    
853
  def call_blockdev_find(self, node, disk):
854
    """Request identification of a given block device.
855

856
    This is a single-node call.
857

858
    """
859
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
860
    if not result.fail_msg and result.payload is not None:
861
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
862
    return result
863

    
864
  def call_blockdev_close(self, node, instance_name, disks):
865
    """Closes the given block devices.
866

867
    This is a single-node call.
868

869
    """
870
    params = [instance_name, [cf.ToDict() for cf in disks]]
871
    return self._SingleNodeCall(node, "blockdev_close", params)
872

    
873
  def call_blockdev_getsizes(self, node, disks):
874
    """Returns the size of the given disks.
875

876
    This is a single-node call.
877

878
    """
879
    params = [[cf.ToDict() for cf in disks]]
880
    return self._SingleNodeCall(node, "blockdev_getsize", params)
881

    
882
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
883
    """Disconnects the network of the given drbd devices.
884

885
    This is a multi-node call.
886

887
    """
888
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
889
                               [nodes_ip, [cf.ToDict() for cf in disks]])
890

    
891
  def call_drbd_attach_net(self, node_list, nodes_ip,
892
                           disks, instance_name, multimaster):
893
    """Disconnects the given drbd devices.
894

895
    This is a multi-node call.
896

897
    """
898
    return self._MultiNodeCall(node_list, "drbd_attach_net",
899
                               [nodes_ip, [cf.ToDict() for cf in disks],
900
                                instance_name, multimaster])
901

    
902
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
903
    """Waits for the synchronization of drbd devices is complete.
904

905
    This is a multi-node call.
906

907
    """
908
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
909
                               [nodes_ip, [cf.ToDict() for cf in disks]])
910

    
911
  @classmethod
912
  def call_upload_file(cls, node_list, file_name, address_list=None):
913
    """Upload a file.
914

915
    The node will refuse the operation in case the file is not on the
916
    approved file list.
917

918
    This is a multi-node call.
919

920
    @type node_list: list
921
    @param node_list: the list of node names to upload to
922
    @type file_name: str
923
    @param file_name: the filename to upload
924
    @type address_list: list or None
925
    @keyword address_list: an optional list of node addresses, in order
926
        to optimize the RPC speed
927

928
    """
929
    file_contents = utils.ReadFile(file_name)
930
    data = cls._Compress(file_contents)
931
    st = os.stat(file_name)
932
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
933
              st.st_atime, st.st_mtime]
934
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
935
                                    address_list=address_list)
936

    
937
  @classmethod
938
  def call_write_ssconf_files(cls, node_list, values):
939
    """Write ssconf files.
940

941
    This is a multi-node call.
942

943
    """
944
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
945

    
946
  def call_os_diagnose(self, node_list):
947
    """Request a diagnose of OS definitions.
948

949
    This is a multi-node call.
950

951
    """
952
    return self._MultiNodeCall(node_list, "os_diagnose", [])
953

    
954
  def call_os_get(self, node, name):
955
    """Returns an OS definition.
956

957
    This is a single-node call.
958

959
    """
960
    result = self._SingleNodeCall(node, "os_get", [name])
961
    if not result.fail_msg and isinstance(result.payload, dict):
962
      result.payload = objects.OS.FromDict(result.payload)
963
    return result
964

    
965
  def call_hooks_runner(self, node_list, hpath, phase, env):
966
    """Call the hooks runner.
967

968
    Args:
969
      - op: the OpCode instance
970
      - env: a dictionary with the environment
971

972
    This is a multi-node call.
973

974
    """
975
    params = [hpath, phase, env]
976
    return self._MultiNodeCall(node_list, "hooks_runner", params)
977

    
978
  def call_iallocator_runner(self, node, name, idata):
979
    """Call an iallocator on a remote node
980

981
    Args:
982
      - name: the iallocator name
983
      - input: the json-encoded input string
984

985
    This is a single-node call.
986

987
    """
988
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
989

    
990
  def call_blockdev_grow(self, node, cf_bdev, amount):
991
    """Request a snapshot of the given block device.
992

993
    This is a single-node call.
994

995
    """
996
    return self._SingleNodeCall(node, "blockdev_grow",
997
                                [cf_bdev.ToDict(), amount])
998

    
999
  def call_blockdev_export(self, node, cf_bdev,
1000
                           dest_node, dest_path, cluster_name):
1001
    """Export a given disk to another node.
1002

1003
    This is a single-node call.
1004

1005
    """
1006
    return self._SingleNodeCall(node, "blockdev_export",
1007
                                [cf_bdev.ToDict(), dest_node, dest_path,
1008
                                 cluster_name])
1009

    
1010
  def call_blockdev_snapshot(self, node, cf_bdev):
1011
    """Request a snapshot of the given block device.
1012

1013
    This is a single-node call.
1014

1015
    """
1016
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1017

    
1018
  def call_finalize_export(self, node, instance, snap_disks):
1019
    """Request the completion of an export operation.
1020

1021
    This writes the export config file, etc.
1022

1023
    This is a single-node call.
1024

1025
    """
1026
    flat_disks = []
1027
    for disk in snap_disks:
1028
      if isinstance(disk, bool):
1029
        flat_disks.append(disk)
1030
      else:
1031
        flat_disks.append(disk.ToDict())
1032

    
1033
    return self._SingleNodeCall(node, "finalize_export",
1034
                                [self._InstDict(instance), flat_disks])
1035

    
1036
  def call_export_info(self, node, path):
1037
    """Queries the export information in a given path.
1038

1039
    This is a single-node call.
1040

1041
    """
1042
    return self._SingleNodeCall(node, "export_info", [path])
1043

    
1044
  def call_export_list(self, node_list):
1045
    """Gets the stored exports list.
1046

1047
    This is a multi-node call.
1048

1049
    """
1050
    return self._MultiNodeCall(node_list, "export_list", [])
1051

    
1052
  def call_export_remove(self, node, export):
1053
    """Requests removal of a given export.
1054

1055
    This is a single-node call.
1056

1057
    """
1058
    return self._SingleNodeCall(node, "export_remove", [export])
1059

    
1060
  @classmethod
1061
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1062
    """Requests a node to clean the cluster information it has.
1063

1064
    This will remove the configuration information from the ganeti data
1065
    dir.
1066

1067
    This is a single-node call.
1068

1069
    """
1070
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1071
                                     [modify_ssh_setup])
1072

    
1073
  def call_node_volumes(self, node_list):
1074
    """Gets all volumes on node(s).
1075

1076
    This is a multi-node call.
1077

1078
    """
1079
    return self._MultiNodeCall(node_list, "node_volumes", [])
1080

    
1081
  def call_node_demote_from_mc(self, node):
1082
    """Demote a node from the master candidate role.
1083

1084
    This is a single-node call.
1085

1086
    """
1087
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1088

    
1089
  def call_node_powercycle(self, node, hypervisor):
1090
    """Tries to powercycle a node.
1091

1092
    This is a single-node call.
1093

1094
    """
1095
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1096

    
1097
  def call_test_delay(self, node_list, duration):
1098
    """Sleep for a fixed time on given node(s).
1099

1100
    This is a multi-node call.
1101

1102
    """
1103
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1104

    
1105
  def call_file_storage_dir_create(self, node, file_storage_dir):
1106
    """Create the given file storage directory.
1107

1108
    This is a single-node call.
1109

1110
    """
1111
    return self._SingleNodeCall(node, "file_storage_dir_create",
1112
                                [file_storage_dir])
1113

    
1114
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1115
    """Remove the given file storage directory.
1116

1117
    This is a single-node call.
1118

1119
    """
1120
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1121
                                [file_storage_dir])
1122

    
1123
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1124
                                   new_file_storage_dir):
1125
    """Rename file storage directory.
1126

1127
    This is a single-node call.
1128

1129
    """
1130
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1131
                                [old_file_storage_dir, new_file_storage_dir])
1132

    
1133
  @classmethod
1134
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1135
    """Update job queue.
1136

1137
    This is a multi-node call.
1138

1139
    """
1140
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1141
                                    [file_name, cls._Compress(content)],
1142
                                    address_list=address_list)
1143

    
1144
  @classmethod
1145
  def call_jobqueue_purge(cls, node):
1146
    """Purge job queue.
1147

1148
    This is a single-node call.
1149

1150
    """
1151
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1152

    
1153
  @classmethod
1154
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1155
    """Rename a job queue file.
1156

1157
    This is a multi-node call.
1158

1159
    """
1160
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1161
                                    address_list=address_list)
1162

    
1163
  @classmethod
1164
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1165
    """Set the drain flag on the queue.
1166

1167
    This is a multi-node call.
1168

1169
    @type node_list: list
1170
    @param node_list: the list of nodes to query
1171
    @type drain_flag: bool
1172
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1173

1174
    """
1175
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1176
                                    [drain_flag])
1177

    
1178
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1179
    """Validate the hypervisor params.
1180

1181
    This is a multi-node call.
1182

1183
    @type node_list: list
1184
    @param node_list: the list of nodes to query
1185
    @type hvname: string
1186
    @param hvname: the hypervisor name
1187
    @type hvparams: dict
1188
    @param hvparams: the hypervisor parameters to be validated
1189

1190
    """
1191
    cluster = self._cfg.GetClusterInfo()
1192
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1193
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1194
                               [hvname, hv_full])
1195

    
1196
  def call_x509_cert_create(self, node, validity):
1197
    """Creates a new X509 certificate for SSL/TLS.
1198

1199
    This is a single-node call.
1200

1201
    @type validity: int
1202
    @param validity: Validity in seconds
1203

1204
    """
1205
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1206

    
1207
  def call_x509_cert_remove(self, node, name):
1208
    """Removes a X509 certificate.
1209

1210
    This is a single-node call.
1211

1212
    @type name: string
1213
    @param name: Certificate name
1214

1215
    """
1216
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1217

    
1218
  def call_import_start(self, node, opts, instance, dest, dest_args):
1219
    """Starts a listener for an import.
1220

1221
    This is a single-node call.
1222

1223
    @type node: string
1224
    @param node: Node name
1225
    @type instance: C{objects.Instance}
1226
    @param instance: Instance object
1227

1228
    """
1229
    return self._SingleNodeCall(node, "import_start",
1230
                                [opts.ToDict(),
1231
                                 self._InstDict(instance), dest,
1232
                                 _EncodeImportExportIO(dest, dest_args)])
1233

    
1234
  def call_export_start(self, node, opts, host, port,
1235
                        instance, source, source_args):
1236
    """Starts an export daemon.
1237

1238
    This is a single-node call.
1239

1240
    @type node: string
1241
    @param node: Node name
1242
    @type instance: C{objects.Instance}
1243
    @param instance: Instance object
1244

1245
    """
1246
    return self._SingleNodeCall(node, "export_start",
1247
                                [opts.ToDict(), host, port,
1248
                                 self._InstDict(instance), source,
1249
                                 _EncodeImportExportIO(source, source_args)])
1250

    
1251
  def call_impexp_status(self, node, names):
1252
    """Gets the status of an import or export.
1253

1254
    This is a single-node call.
1255

1256
    @type node: string
1257
    @param node: Node name
1258
    @type names: List of strings
1259
    @param names: Import/export names
1260
    @rtype: List of L{objects.ImportExportStatus} instances
1261
    @return: Returns a list of the state of each named import/export or None if
1262
             a status couldn't be retrieved
1263

1264
    """
1265
    result = self._SingleNodeCall(node, "impexp_status", [names])
1266

    
1267
    if not result.fail_msg:
1268
      decoded = []
1269

    
1270
      for i in result.payload:
1271
        if i is None:
1272
          decoded.append(None)
1273
          continue
1274
        decoded.append(objects.ImportExportStatus.FromDict(i))
1275

    
1276
      result.payload = decoded
1277

    
1278
    return result
1279

    
1280
  def call_impexp_abort(self, node, name):
1281
    """Aborts an import or export.
1282

1283
    This is a single-node call.
1284

1285
    @type node: string
1286
    @param node: Node name
1287
    @type name: string
1288
    @param name: Import/export name
1289

1290
    """
1291
    return self._SingleNodeCall(node, "impexp_abort", [name])
1292

    
1293
  def call_impexp_cleanup(self, node, name):
1294
    """Cleans up after an import or export.
1295

1296
    This is a single-node call.
1297

1298
    @type node: string
1299
    @param node: Node name
1300
    @type name: string
1301
    @param name: Import/export name
1302

1303
    """
1304
    return self._SingleNodeCall(node, "impexp_cleanup", [name])