Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 96acbc09

History | View | Annotate | Download (33.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @type failed: boolean
87
  @ivar failed: whether the operation failed at transport level (not
88
      application level on the remote node)
89
  @ivar call: the name of the RPC call
90
  @ivar node: the name of the node to which we made the call
91
  @ivar offline: whether the operation failed because the node was
92
      offline, as opposed to actual failure; offline=True will always
93
      imply failed=True, in order to allow simpler checking if
94
      the user doesn't care about the exact failure mode
95
  @ivar fail_msg: the error message if the call failed
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.fail_msg = "Node is marked offline"
107
      self.data = self.payload = None
108
    elif failed:
109
      self.fail_msg = self._EnsureErr(data)
110
      self.data = self.payload = None
111
    else:
112
      self.data = data
113
      if not isinstance(self.data, (tuple, list)):
114
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
115
                         type(self.data))
116
      elif len(data) != 2:
117
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
118
                         "expected 2" % len(self.data))
119
      elif not self.data[0]:
120
        self.fail_msg = self._EnsureErr(self.data[1])
121
      else:
122
        # finally success
123
        self.fail_msg = None
124
        self.payload = data[1]
125

    
126
  @staticmethod
127
  def _EnsureErr(val):
128
    """Helper to ensure we return a 'True' value for error."""
129
    if val:
130
      return val
131
    else:
132
      return "No error information"
133

    
134
  def Raise(self, msg, prereq=False):
135
    """If the result has failed, raise an OpExecError.
136

137
    This is used so that LU code doesn't have to check for each
138
    result, but instead can call this function.
139

140
    """
141
    if not self.fail_msg:
142
      return
143

    
144
    if not msg: # one could pass None for default message
145
      msg = ("Call '%s' to node '%s' has failed: %s" %
146
             (self.call, self.node, self.fail_msg))
147
    else:
148
      msg = "%s: %s" % (msg, self.fail_msg)
149
    if prereq:
150
      ec = errors.OpPrereqError
151
    else:
152
      ec = errors.OpExecError
153
    raise ec(msg)
154

    
155
  def RemoteFailMsg(self):
156
    """Check if the remote procedure failed.
157

158
    @return: the fail_msg attribute
159

160
    """
161
    return self.fail_msg
162

    
163

    
164
class Client:
165
  """RPC Client class.
166

167
  This class, given a (remote) method name, a list of parameters and a
168
  list of nodes, will contact (in parallel) all nodes, and return a
169
  dict of results (key: node name, value: result).
170

171
  One current bug is that generic failure is still signaled by
172
  'False' result, which is not good. This overloading of values can
173
  cause bugs.
174

175
  """
176
  def __init__(self, procedure, body, port):
177
    self.procedure = procedure
178
    self.body = body
179
    self.port = port
180
    self.nc = {}
181

    
182
    self._ssl_params = \
183
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
184
                         ssl_cert_path=constants.SSL_CERT_FILE)
185

    
186
  def ConnectList(self, node_list, address_list=None):
187
    """Add a list of nodes to the target nodes.
188

189
    @type node_list: list
190
    @param node_list: the list of node names to connect
191
    @type address_list: list or None
192
    @keyword address_list: either None or a list with node addresses,
193
        which must have the same length as the node list
194

195
    """
196
    if address_list is None:
197
      address_list = [None for _ in node_list]
198
    else:
199
      assert len(node_list) == len(address_list), \
200
             "Name and address lists should have the same length"
201
    for node, address in zip(node_list, address_list):
202
      self.ConnectNode(node, address)
203

    
204
  def ConnectNode(self, name, address=None):
205
    """Add a node to the target list.
206

207
    @type name: str
208
    @param name: the node name
209
    @type address: str
210
    @keyword address: the node address, if known
211

212
    """
213
    if address is None:
214
      address = name
215

    
216
    self.nc[name] = \
217
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
218
                                    "/%s" % self.procedure,
219
                                    post_data=self.body,
220
                                    ssl_params=self._ssl_params,
221
                                    ssl_verify_peer=True)
222

    
223
  def GetResults(self):
224
    """Call nodes and return results.
225

226
    @rtype: list
227
    @return: List of RPC results
228

229
    """
230
    assert _http_manager, "RPC module not initialized"
231

    
232
    _http_manager.ExecRequests(self.nc.values())
233

    
234
    results = {}
235

    
236
    for name, req in self.nc.iteritems():
237
      if req.success and req.resp_status_code == http.HTTP_OK:
238
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
239
                                  node=name, call=self.procedure)
240
        continue
241

    
242
      # TODO: Better error reporting
243
      if req.error:
244
        msg = req.error
245
      else:
246
        msg = req.resp_body
247

    
248
      logging.error("RPC error in %s from node %s: %s",
249
                    self.procedure, name, msg)
250
      results[name] = RpcResult(data=msg, failed=True, node=name,
251
                                call=self.procedure)
252

    
253
    return results
254

    
255

    
256
class RpcRunner(object):
257
  """RPC runner class"""
258

    
259
  def __init__(self, cfg):
260
    """Initialized the rpc runner.
261

262
    @type cfg:  C{config.ConfigWriter}
263
    @param cfg: the configuration object that will be used to get data
264
                about the cluster
265

266
    """
267
    self._cfg = cfg
268
    self.port = utils.GetDaemonPort(constants.NODED)
269

    
270
  def _InstDict(self, instance, hvp=None, bep=None):
271
    """Convert the given instance to a dict.
272

273
    This is done via the instance's ToDict() method and additionally
274
    we fill the hvparams with the cluster defaults.
275

276
    @type instance: L{objects.Instance}
277
    @param instance: an Instance object
278
    @type hvp: dict or None
279
    @param hvp: a dictionary with overridden hypervisor parameters
280
    @type bep: dict or None
281
    @param bep: a dictionary with overridden backend parameters
282
    @rtype: dict
283
    @return: the instance dict, with the hvparams filled with the
284
        cluster defaults
285

286
    """
287
    idict = instance.ToDict()
288
    cluster = self._cfg.GetClusterInfo()
289
    idict["hvparams"] = cluster.FillHV(instance)
290
    if hvp is not None:
291
      idict["hvparams"].update(hvp)
292
    idict["beparams"] = cluster.FillBE(instance)
293
    if bep is not None:
294
      idict["beparams"].update(bep)
295
    for nic in idict["nics"]:
296
      nic['nicparams'] = objects.FillDict(
297
        cluster.nicparams[constants.PP_DEFAULT],
298
        nic['nicparams'])
299
    return idict
300

    
301
  def _ConnectList(self, client, node_list, call):
302
    """Helper for computing node addresses.
303

304
    @type client: L{ganeti.rpc.Client}
305
    @param client: a C{Client} instance
306
    @type node_list: list
307
    @param node_list: the node list we should connect
308
    @type call: string
309
    @param call: the name of the remote procedure call, for filling in
310
        correctly any eventual offline nodes' results
311

312
    """
313
    all_nodes = self._cfg.GetAllNodesInfo()
314
    name_list = []
315
    addr_list = []
316
    skip_dict = {}
317
    for node in node_list:
318
      if node in all_nodes:
319
        if all_nodes[node].offline:
320
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
321
          continue
322
        val = all_nodes[node].primary_ip
323
      else:
324
        val = None
325
      addr_list.append(val)
326
      name_list.append(node)
327
    if name_list:
328
      client.ConnectList(name_list, address_list=addr_list)
329
    return skip_dict
330

    
331
  def _ConnectNode(self, client, node, call):
332
    """Helper for computing one node's address.
333

334
    @type client: L{ganeti.rpc.Client}
335
    @param client: a C{Client} instance
336
    @type node: str
337
    @param node: the node we should connect
338
    @type call: string
339
    @param call: the name of the remote procedure call, for filling in
340
        correctly any eventual offline nodes' results
341

342
    """
343
    node_info = self._cfg.GetNodeInfo(node)
344
    if node_info is not None:
345
      if node_info.offline:
346
        return RpcResult(node=node, offline=True, call=call)
347
      addr = node_info.primary_ip
348
    else:
349
      addr = None
350
    client.ConnectNode(node, address=addr)
351

    
352
  def _MultiNodeCall(self, node_list, procedure, args):
353
    """Helper for making a multi-node call
354

355
    """
356
    body = serializer.DumpJson(args, indent=False)
357
    c = Client(procedure, body, self.port)
358
    skip_dict = self._ConnectList(c, node_list, procedure)
359
    skip_dict.update(c.GetResults())
360
    return skip_dict
361

    
362
  @classmethod
363
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
364
                           address_list=None):
365
    """Helper for making a multi-node static call
366

367
    """
368
    body = serializer.DumpJson(args, indent=False)
369
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
370
    c.ConnectList(node_list, address_list=address_list)
371
    return c.GetResults()
372

    
373
  def _SingleNodeCall(self, node, procedure, args):
374
    """Helper for making a single-node call
375

376
    """
377
    body = serializer.DumpJson(args, indent=False)
378
    c = Client(procedure, body, self.port)
379
    result = self._ConnectNode(c, node, procedure)
380
    if result is None:
381
      # we did connect, node is not offline
382
      result = c.GetResults()[node]
383
    return result
384

    
385
  @classmethod
386
  def _StaticSingleNodeCall(cls, node, procedure, args):
387
    """Helper for making a single-node static call
388

389
    """
390
    body = serializer.DumpJson(args, indent=False)
391
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
392
    c.ConnectNode(node)
393
    return c.GetResults()[node]
394

    
395
  @staticmethod
396
  def _Compress(data):
397
    """Compresses a string for transport over RPC.
398

399
    Small amounts of data are not compressed.
400

401
    @type data: str
402
    @param data: Data
403
    @rtype: tuple
404
    @return: Encoded data to send
405

406
    """
407
    # Small amounts of data are not compressed
408
    if len(data) < 512:
409
      return (constants.RPC_ENCODING_NONE, data)
410

    
411
    # Compress with zlib and encode in base64
412
    return (constants.RPC_ENCODING_ZLIB_BASE64,
413
            base64.b64encode(zlib.compress(data, 3)))
414

    
415
  #
416
  # Begin RPC calls
417
  #
418

    
419
  def call_lv_list(self, node_list, vg_name):
420
    """Gets the logical volumes present in a given volume group.
421

422
    This is a multi-node call.
423

424
    """
425
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
426

    
427
  def call_vg_list(self, node_list):
428
    """Gets the volume group list.
429

430
    This is a multi-node call.
431

432
    """
433
    return self._MultiNodeCall(node_list, "vg_list", [])
434

    
435
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
436
    """Get list of storage units.
437

438
    This is a multi-node call.
439

440
    """
441
    return self._MultiNodeCall(node_list, "storage_list",
442
                               [su_name, su_args, name, fields])
443

    
444
  def call_storage_modify(self, node, su_name, su_args, name, changes):
445
    """Modify a storage unit.
446

447
    This is a single-node call.
448

449
    """
450
    return self._SingleNodeCall(node, "storage_modify",
451
                                [su_name, su_args, name, changes])
452

    
453
  def call_bridges_exist(self, node, bridges_list):
454
    """Checks if a node has all the bridges given.
455

456
    This method checks if all bridges given in the bridges_list are
457
    present on the remote node, so that an instance that uses interfaces
458
    on those bridges can be started.
459

460
    This is a single-node call.
461

462
    """
463
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
464

    
465
  def call_instance_start(self, node, instance, hvp, bep):
466
    """Starts an instance.
467

468
    This is a single-node call.
469

470
    """
471
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
472
    return self._SingleNodeCall(node, "instance_start", [idict])
473

    
474
  def call_instance_shutdown(self, node, instance):
475
    """Stops an instance.
476

477
    This is a single-node call.
478

479
    """
480
    return self._SingleNodeCall(node, "instance_shutdown",
481
                                [self._InstDict(instance)])
482

    
483
  def call_migration_info(self, node, instance):
484
    """Gather the information necessary to prepare an instance migration.
485

486
    This is a single-node call.
487

488
    @type node: string
489
    @param node: the node on which the instance is currently running
490
    @type instance: C{objects.Instance}
491
    @param instance: the instance definition
492

493
    """
494
    return self._SingleNodeCall(node, "migration_info",
495
                                [self._InstDict(instance)])
496

    
497
  def call_accept_instance(self, node, instance, info, target):
498
    """Prepare a node to accept an instance.
499

500
    This is a single-node call.
501

502
    @type node: string
503
    @param node: the target node for the migration
504
    @type instance: C{objects.Instance}
505
    @param instance: the instance definition
506
    @type info: opaque/hypervisor specific (string/data)
507
    @param info: result for the call_migration_info call
508
    @type target: string
509
    @param target: target hostname (usually ip address) (on the node itself)
510

511
    """
512
    return self._SingleNodeCall(node, "accept_instance",
513
                                [self._InstDict(instance), info, target])
514

    
515
  def call_finalize_migration(self, node, instance, info, success):
516
    """Finalize any target-node migration specific operation.
517

518
    This is called both in case of a successful migration and in case of error
519
    (in which case it should abort the migration).
520

521
    This is a single-node call.
522

523
    @type node: string
524
    @param node: the target node for the migration
525
    @type instance: C{objects.Instance}
526
    @param instance: the instance definition
527
    @type info: opaque/hypervisor specific (string/data)
528
    @param info: result for the call_migration_info call
529
    @type success: boolean
530
    @param success: whether the migration was a success or a failure
531

532
    """
533
    return self._SingleNodeCall(node, "finalize_migration",
534
                                [self._InstDict(instance), info, success])
535

    
536
  def call_instance_migrate(self, node, instance, target, live):
537
    """Migrate an instance.
538

539
    This is a single-node call.
540

541
    @type node: string
542
    @param node: the node on which the instance is currently running
543
    @type instance: C{objects.Instance}
544
    @param instance: the instance definition
545
    @type target: string
546
    @param target: the target node name
547
    @type live: boolean
548
    @param live: whether the migration should be done live or not (the
549
        interpretation of this parameter is left to the hypervisor)
550

551
    """
552
    return self._SingleNodeCall(node, "instance_migrate",
553
                                [self._InstDict(instance), target, live])
554

    
555
  def call_instance_reboot(self, node, instance, reboot_type):
556
    """Reboots an instance.
557

558
    This is a single-node call.
559

560
    """
561
    return self._SingleNodeCall(node, "instance_reboot",
562
                                [self._InstDict(instance), reboot_type])
563

    
564
  def call_instance_os_add(self, node, inst, reinstall):
565
    """Installs an OS on the given instance.
566

567
    This is a single-node call.
568

569
    """
570
    return self._SingleNodeCall(node, "instance_os_add",
571
                                [self._InstDict(inst), reinstall])
572

    
573
  def call_instance_run_rename(self, node, inst, old_name):
574
    """Run the OS rename script for an instance.
575

576
    This is a single-node call.
577

578
    """
579
    return self._SingleNodeCall(node, "instance_run_rename",
580
                                [self._InstDict(inst), old_name])
581

    
582
  def call_instance_info(self, node, instance, hname):
583
    """Returns information about a single instance.
584

585
    This is a single-node call.
586

587
    @type node: list
588
    @param node: the list of nodes to query
589
    @type instance: string
590
    @param instance: the instance name
591
    @type hname: string
592
    @param hname: the hypervisor type of the instance
593

594
    """
595
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
596

    
597
  def call_instance_migratable(self, node, instance):
598
    """Checks whether the given instance can be migrated.
599

600
    This is a single-node call.
601

602
    @param node: the node to query
603
    @type instance: L{objects.Instance}
604
    @param instance: the instance to check
605

606

607
    """
608
    return self._SingleNodeCall(node, "instance_migratable",
609
                                [self._InstDict(instance)])
610

    
611
  def call_all_instances_info(self, node_list, hypervisor_list):
612
    """Returns information about all instances on the given nodes.
613

614
    This is a multi-node call.
615

616
    @type node_list: list
617
    @param node_list: the list of nodes to query
618
    @type hypervisor_list: list
619
    @param hypervisor_list: the hypervisors to query for instances
620

621
    """
622
    return self._MultiNodeCall(node_list, "all_instances_info",
623
                               [hypervisor_list])
624

    
625
  def call_instance_list(self, node_list, hypervisor_list):
626
    """Returns the list of running instances on a given node.
627

628
    This is a multi-node call.
629

630
    @type node_list: list
631
    @param node_list: the list of nodes to query
632
    @type hypervisor_list: list
633
    @param hypervisor_list: the hypervisors to query for instances
634

635
    """
636
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
637

    
638
  def call_node_tcp_ping(self, node, source, target, port, timeout,
639
                         live_port_needed):
640
    """Do a TcpPing on the remote node
641

642
    This is a single-node call.
643

644
    """
645
    return self._SingleNodeCall(node, "node_tcp_ping",
646
                                [source, target, port, timeout,
647
                                 live_port_needed])
648

    
649
  def call_node_has_ip_address(self, node, address):
650
    """Checks if a node has the given IP address.
651

652
    This is a single-node call.
653

654
    """
655
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
656

    
657
  def call_node_info(self, node_list, vg_name, hypervisor_type):
658
    """Return node information.
659

660
    This will return memory information and volume group size and free
661
    space.
662

663
    This is a multi-node call.
664

665
    @type node_list: list
666
    @param node_list: the list of nodes to query
667
    @type vg_name: C{string}
668
    @param vg_name: the name of the volume group to ask for disk space
669
        information
670
    @type hypervisor_type: C{str}
671
    @param hypervisor_type: the name of the hypervisor to ask for
672
        memory information
673

674
    """
675
    return self._MultiNodeCall(node_list, "node_info",
676
                               [vg_name, hypervisor_type])
677

    
678
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
679
    """Add a node to the cluster.
680

681
    This is a single-node call.
682

683
    """
684
    return self._SingleNodeCall(node, "node_add",
685
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
686

    
687
  def call_node_verify(self, node_list, checkdict, cluster_name):
688
    """Request verification of given parameters.
689

690
    This is a multi-node call.
691

692
    """
693
    return self._MultiNodeCall(node_list, "node_verify",
694
                               [checkdict, cluster_name])
695

    
696
  @classmethod
697
  def call_node_start_master(cls, node, start_daemons, no_voting):
698
    """Tells a node to activate itself as a master.
699

700
    This is a single-node call.
701

702
    """
703
    return cls._StaticSingleNodeCall(node, "node_start_master",
704
                                     [start_daemons, no_voting])
705

    
706
  @classmethod
707
  def call_node_stop_master(cls, node, stop_daemons):
708
    """Tells a node to demote itself from master status.
709

710
    This is a single-node call.
711

712
    """
713
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
714

    
715
  @classmethod
716
  def call_master_info(cls, node_list):
717
    """Query master info.
718

719
    This is a multi-node call.
720

721
    """
722
    # TODO: should this method query down nodes?
723
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
724

    
725
  def call_version(self, node_list):
726
    """Query node version.
727

728
    This is a multi-node call.
729

730
    """
731
    return self._MultiNodeCall(node_list, "version", [])
732

    
733
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
734
    """Request creation of a given block device.
735

736
    This is a single-node call.
737

738
    """
739
    return self._SingleNodeCall(node, "blockdev_create",
740
                                [bdev.ToDict(), size, owner, on_primary, info])
741

    
742
  def call_blockdev_remove(self, node, bdev):
743
    """Request removal of a given block device.
744

745
    This is a single-node call.
746

747
    """
748
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
749

    
750
  def call_blockdev_rename(self, node, devlist):
751
    """Request rename of the given block devices.
752

753
    This is a single-node call.
754

755
    """
756
    return self._SingleNodeCall(node, "blockdev_rename",
757
                                [(d.ToDict(), uid) for d, uid in devlist])
758

    
759
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
760
    """Request assembling of a given block device.
761

762
    This is a single-node call.
763

764
    """
765
    return self._SingleNodeCall(node, "blockdev_assemble",
766
                                [disk.ToDict(), owner, on_primary])
767

    
768
  def call_blockdev_shutdown(self, node, disk):
769
    """Request shutdown of a given block device.
770

771
    This is a single-node call.
772

773
    """
774
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
775

    
776
  def call_blockdev_addchildren(self, node, bdev, ndevs):
777
    """Request adding a list of children to a (mirroring) device.
778

779
    This is a single-node call.
780

781
    """
782
    return self._SingleNodeCall(node, "blockdev_addchildren",
783
                                [bdev.ToDict(),
784
                                 [disk.ToDict() for disk in ndevs]])
785

    
786
  def call_blockdev_removechildren(self, node, bdev, ndevs):
787
    """Request removing a list of children from a (mirroring) device.
788

789
    This is a single-node call.
790

791
    """
792
    return self._SingleNodeCall(node, "blockdev_removechildren",
793
                                [bdev.ToDict(),
794
                                 [disk.ToDict() for disk in ndevs]])
795

    
796
  def call_blockdev_getmirrorstatus(self, node, disks):
797
    """Request status of a (mirroring) device.
798

799
    This is a single-node call.
800

801
    """
802
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
803
                                [dsk.ToDict() for dsk in disks])
804

    
805
  def call_blockdev_find(self, node, disk):
806
    """Request identification of a given block device.
807

808
    This is a single-node call.
809

810
    """
811
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
812
    if not result.failed and result.payload is not None:
813
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
814
    return result
815

    
816
  def call_blockdev_close(self, node, instance_name, disks):
817
    """Closes the given block devices.
818

819
    This is a single-node call.
820

821
    """
822
    params = [instance_name, [cf.ToDict() for cf in disks]]
823
    return self._SingleNodeCall(node, "blockdev_close", params)
824

    
825
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
826
    """Disconnects the network of the given drbd devices.
827

828
    This is a multi-node call.
829

830
    """
831
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
832
                               [nodes_ip, [cf.ToDict() for cf in disks]])
833

    
834
  def call_drbd_attach_net(self, node_list, nodes_ip,
835
                           disks, instance_name, multimaster):
836
    """Disconnects the given drbd devices.
837

838
    This is a multi-node call.
839

840
    """
841
    return self._MultiNodeCall(node_list, "drbd_attach_net",
842
                               [nodes_ip, [cf.ToDict() for cf in disks],
843
                                instance_name, multimaster])
844

    
845
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
846
    """Waits for the synchronization of drbd devices is complete.
847

848
    This is a multi-node call.
849

850
    """
851
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
852
                               [nodes_ip, [cf.ToDict() for cf in disks]])
853

    
854
  @classmethod
855
  def call_upload_file(cls, node_list, file_name, address_list=None):
856
    """Upload a file.
857

858
    The node will refuse the operation in case the file is not on the
859
    approved file list.
860

861
    This is a multi-node call.
862

863
    @type node_list: list
864
    @param node_list: the list of node names to upload to
865
    @type file_name: str
866
    @param file_name: the filename to upload
867
    @type address_list: list or None
868
    @keyword address_list: an optional list of node addresses, in order
869
        to optimize the RPC speed
870

871
    """
872
    file_contents = utils.ReadFile(file_name)
873
    data = cls._Compress(file_contents)
874
    st = os.stat(file_name)
875
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
876
              st.st_atime, st.st_mtime]
877
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
878
                                    address_list=address_list)
879

    
880
  @classmethod
881
  def call_write_ssconf_files(cls, node_list, values):
882
    """Write ssconf files.
883

884
    This is a multi-node call.
885

886
    """
887
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
888

    
889
  def call_os_diagnose(self, node_list):
890
    """Request a diagnose of OS definitions.
891

892
    This is a multi-node call.
893

894
    """
895
    return self._MultiNodeCall(node_list, "os_diagnose", [])
896

    
897
  def call_os_get(self, node, name):
898
    """Returns an OS definition.
899

900
    This is a single-node call.
901

902
    """
903
    result = self._SingleNodeCall(node, "os_get", [name])
904
    if not result.failed and isinstance(result.data, dict):
905
      result.data = objects.OS.FromDict(result.data)
906
    return result
907

    
908
  def call_hooks_runner(self, node_list, hpath, phase, env):
909
    """Call the hooks runner.
910

911
    Args:
912
      - op: the OpCode instance
913
      - env: a dictionary with the environment
914

915
    This is a multi-node call.
916

917
    """
918
    params = [hpath, phase, env]
919
    return self._MultiNodeCall(node_list, "hooks_runner", params)
920

    
921
  def call_iallocator_runner(self, node, name, idata):
922
    """Call an iallocator on a remote node
923

924
    Args:
925
      - name: the iallocator name
926
      - input: the json-encoded input string
927

928
    This is a single-node call.
929

930
    """
931
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
932

    
933
  def call_blockdev_grow(self, node, cf_bdev, amount):
934
    """Request a snapshot of the given block device.
935

936
    This is a single-node call.
937

938
    """
939
    return self._SingleNodeCall(node, "blockdev_grow",
940
                                [cf_bdev.ToDict(), amount])
941

    
942
  def call_blockdev_snapshot(self, node, cf_bdev):
943
    """Request a snapshot of the given block device.
944

945
    This is a single-node call.
946

947
    """
948
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
949

    
950
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
951
                           cluster_name, idx):
952
    """Request the export of a given snapshot.
953

954
    This is a single-node call.
955

956
    """
957
    return self._SingleNodeCall(node, "snapshot_export",
958
                                [snap_bdev.ToDict(), dest_node,
959
                                 self._InstDict(instance), cluster_name, idx])
960

    
961
  def call_finalize_export(self, node, instance, snap_disks):
962
    """Request the completion of an export operation.
963

964
    This writes the export config file, etc.
965

966
    This is a single-node call.
967

968
    """
969
    flat_disks = []
970
    for disk in snap_disks:
971
      if isinstance(disk, bool):
972
        flat_disks.append(disk)
973
      else:
974
        flat_disks.append(disk.ToDict())
975

    
976
    return self._SingleNodeCall(node, "finalize_export",
977
                                [self._InstDict(instance), flat_disks])
978

    
979
  def call_export_info(self, node, path):
980
    """Queries the export information in a given path.
981

982
    This is a single-node call.
983

984
    """
985
    return self._SingleNodeCall(node, "export_info", [path])
986

    
987
  def call_instance_os_import(self, node, inst, src_node, src_images,
988
                              cluster_name):
989
    """Request the import of a backup into an instance.
990

991
    This is a single-node call.
992

993
    """
994
    return self._SingleNodeCall(node, "instance_os_import",
995
                                [self._InstDict(inst), src_node, src_images,
996
                                 cluster_name])
997

    
998
  def call_export_list(self, node_list):
999
    """Gets the stored exports list.
1000

1001
    This is a multi-node call.
1002

1003
    """
1004
    return self._MultiNodeCall(node_list, "export_list", [])
1005

    
1006
  def call_export_remove(self, node, export):
1007
    """Requests removal of a given export.
1008

1009
    This is a single-node call.
1010

1011
    """
1012
    return self._SingleNodeCall(node, "export_remove", [export])
1013

    
1014
  @classmethod
1015
  def call_node_leave_cluster(cls, node):
1016
    """Requests a node to clean the cluster information it has.
1017

1018
    This will remove the configuration information from the ganeti data
1019
    dir.
1020

1021
    This is a single-node call.
1022

1023
    """
1024
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1025

    
1026
  def call_node_volumes(self, node_list):
1027
    """Gets all volumes on node(s).
1028

1029
    This is a multi-node call.
1030

1031
    """
1032
    return self._MultiNodeCall(node_list, "node_volumes", [])
1033

    
1034
  def call_node_demote_from_mc(self, node):
1035
    """Demote a node from the master candidate role.
1036

1037
    This is a single-node call.
1038

1039
    """
1040
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1041

    
1042

    
1043
  def call_node_powercycle(self, node, hypervisor):
1044
    """Tries to powercycle a node.
1045

1046
    This is a single-node call.
1047

1048
    """
1049
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1050

    
1051

    
1052
  def call_test_delay(self, node_list, duration):
1053
    """Sleep for a fixed time on given node(s).
1054

1055
    This is a multi-node call.
1056

1057
    """
1058
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1059

    
1060
  def call_file_storage_dir_create(self, node, file_storage_dir):
1061
    """Create the given file storage directory.
1062

1063
    This is a single-node call.
1064

1065
    """
1066
    return self._SingleNodeCall(node, "file_storage_dir_create",
1067
                                [file_storage_dir])
1068

    
1069
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1070
    """Remove the given file storage directory.
1071

1072
    This is a single-node call.
1073

1074
    """
1075
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1076
                                [file_storage_dir])
1077

    
1078
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1079
                                   new_file_storage_dir):
1080
    """Rename file storage directory.
1081

1082
    This is a single-node call.
1083

1084
    """
1085
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1086
                                [old_file_storage_dir, new_file_storage_dir])
1087

    
1088
  @classmethod
1089
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1090
    """Update job queue.
1091

1092
    This is a multi-node call.
1093

1094
    """
1095
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1096
                                    [file_name, cls._Compress(content)],
1097
                                    address_list=address_list)
1098

    
1099
  @classmethod
1100
  def call_jobqueue_purge(cls, node):
1101
    """Purge job queue.
1102

1103
    This is a single-node call.
1104

1105
    """
1106
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1107

    
1108
  @classmethod
1109
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1110
    """Rename a job queue file.
1111

1112
    This is a multi-node call.
1113

1114
    """
1115
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1116
                                    address_list=address_list)
1117

    
1118
  @classmethod
1119
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1120
    """Set the drain flag on the queue.
1121

1122
    This is a multi-node call.
1123

1124
    @type node_list: list
1125
    @param node_list: the list of nodes to query
1126
    @type drain_flag: bool
1127
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1128

1129
    """
1130
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1131
                                    [drain_flag])
1132

    
1133
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1134
    """Validate the hypervisor params.
1135

1136
    This is a multi-node call.
1137

1138
    @type node_list: list
1139
    @param node_list: the list of nodes to query
1140
    @type hvname: string
1141
    @param hvname: the hypervisor name
1142
    @type hvparams: dict
1143
    @param hvparams: the hypervisor parameters to be validated
1144

1145
    """
1146
    cluster = self._cfg.GetClusterInfo()
1147
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1148
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1149
                               [hvname, hv_full])