Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 045dd6d9

History | View | Annotate | Download (34.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @ivar call: the name of the RPC call
87
  @ivar node: the name of the node to which we made the call
88
  @ivar offline: whether the operation failed because the node was
89
      offline, as opposed to actual failure; offline=True will always
90
      imply failed=True, in order to allow simpler checking if
91
      the user doesn't care about the exact failure mode
92
  @ivar fail_msg: the error message if the call failed
93

94
  """
95
  def __init__(self, data=None, failed=False, offline=False,
96
               call=None, node=None):
97
    self.offline = offline
98
    self.call = call
99
    self.node = node
100

    
101
    if offline:
102
      self.fail_msg = "Node is marked offline"
103
      self.data = self.payload = None
104
    elif failed:
105
      self.fail_msg = self._EnsureErr(data)
106
      self.data = self.payload = None
107
    else:
108
      self.data = data
109
      if not isinstance(self.data, (tuple, list)):
110
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
111
                         type(self.data))
112
        self.payload = None
113
      elif len(data) != 2:
114
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
115
                         "expected 2" % len(self.data))
116
        self.payload = None
117
      elif not self.data[0]:
118
        self.fail_msg = self._EnsureErr(self.data[1])
119
        self.payload = None
120
      else:
121
        # finally success
122
        self.fail_msg = None
123
        self.payload = data[1]
124

    
125
    assert hasattr(self, "call")
126
    assert hasattr(self, "data")
127
    assert hasattr(self, "fail_msg")
128
    assert hasattr(self, "node")
129
    assert hasattr(self, "offline")
130
    assert hasattr(self, "payload")
131

    
132
  @staticmethod
133
  def _EnsureErr(val):
134
    """Helper to ensure we return a 'True' value for error."""
135
    if val:
136
      return val
137
    else:
138
      return "No error information"
139

    
140
  def Raise(self, msg, prereq=False, ecode=None):
141
    """If the result has failed, raise an OpExecError.
142

143
    This is used so that LU code doesn't have to check for each
144
    result, but instead can call this function.
145

146
    """
147
    if not self.fail_msg:
148
      return
149

    
150
    if not msg: # one could pass None for default message
151
      msg = ("Call '%s' to node '%s' has failed: %s" %
152
             (self.call, self.node, self.fail_msg))
153
    else:
154
      msg = "%s: %s" % (msg, self.fail_msg)
155
    if prereq:
156
      ec = errors.OpPrereqError
157
    else:
158
      ec = errors.OpExecError
159
    if ecode is not None:
160
      args = (msg, prereq)
161
    else:
162
      args = (msg, )
163
    raise ec(*args)
164

    
165

    
166
class Client:
167
  """RPC Client class.
168

169
  This class, given a (remote) method name, a list of parameters and a
170
  list of nodes, will contact (in parallel) all nodes, and return a
171
  dict of results (key: node name, value: result).
172

173
  One current bug is that generic failure is still signaled by
174
  'False' result, which is not good. This overloading of values can
175
  cause bugs.
176

177
  """
178
  def __init__(self, procedure, body, port):
179
    self.procedure = procedure
180
    self.body = body
181
    self.port = port
182
    self.nc = {}
183

    
184
    self._ssl_params = \
185
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
186
                         ssl_cert_path=constants.SSL_CERT_FILE)
187

    
188
  def ConnectList(self, node_list, address_list=None):
189
    """Add a list of nodes to the target nodes.
190

191
    @type node_list: list
192
    @param node_list: the list of node names to connect
193
    @type address_list: list or None
194
    @keyword address_list: either None or a list with node addresses,
195
        which must have the same length as the node list
196

197
    """
198
    if address_list is None:
199
      address_list = [None for _ in node_list]
200
    else:
201
      assert len(node_list) == len(address_list), \
202
             "Name and address lists should have the same length"
203
    for node, address in zip(node_list, address_list):
204
      self.ConnectNode(node, address)
205

    
206
  def ConnectNode(self, name, address=None):
207
    """Add a node to the target list.
208

209
    @type name: str
210
    @param name: the node name
211
    @type address: str
212
    @keyword address: the node address, if known
213

214
    """
215
    if address is None:
216
      address = name
217

    
218
    self.nc[name] = \
219
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
220
                                    "/%s" % self.procedure,
221
                                    post_data=self.body,
222
                                    ssl_params=self._ssl_params,
223
                                    ssl_verify_peer=True)
224

    
225
  def GetResults(self):
226
    """Call nodes and return results.
227

228
    @rtype: list
229
    @return: List of RPC results
230

231
    """
232
    assert _http_manager, "RPC module not initialized"
233

    
234
    _http_manager.ExecRequests(self.nc.values())
235

    
236
    results = {}
237

    
238
    for name, req in self.nc.iteritems():
239
      if req.success and req.resp_status_code == http.HTTP_OK:
240
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
241
                                  node=name, call=self.procedure)
242
        continue
243

    
244
      # TODO: Better error reporting
245
      if req.error:
246
        msg = req.error
247
      else:
248
        msg = req.resp_body
249

    
250
      logging.error("RPC error in %s from node %s: %s",
251
                    self.procedure, name, msg)
252
      results[name] = RpcResult(data=msg, failed=True, node=name,
253
                                call=self.procedure)
254

    
255
    return results
256

    
257

    
258
class RpcRunner(object):
259
  """RPC runner class"""
260

    
261
  def __init__(self, cfg):
262
    """Initialized the rpc runner.
263

264
    @type cfg:  C{config.ConfigWriter}
265
    @param cfg: the configuration object that will be used to get data
266
                about the cluster
267

268
    """
269
    self._cfg = cfg
270
    self.port = utils.GetDaemonPort(constants.NODED)
271

    
272
  def _InstDict(self, instance, hvp=None, bep=None):
273
    """Convert the given instance to a dict.
274

275
    This is done via the instance's ToDict() method and additionally
276
    we fill the hvparams with the cluster defaults.
277

278
    @type instance: L{objects.Instance}
279
    @param instance: an Instance object
280
    @type hvp: dict or None
281
    @param hvp: a dictionary with overridden hypervisor parameters
282
    @type bep: dict or None
283
    @param bep: a dictionary with overridden backend parameters
284
    @rtype: dict
285
    @return: the instance dict, with the hvparams filled with the
286
        cluster defaults
287

288
    """
289
    idict = instance.ToDict()
290
    cluster = self._cfg.GetClusterInfo()
291
    idict["hvparams"] = cluster.FillHV(instance)
292
    if hvp is not None:
293
      idict["hvparams"].update(hvp)
294
    idict["beparams"] = cluster.FillBE(instance)
295
    if bep is not None:
296
      idict["beparams"].update(bep)
297
    for nic in idict["nics"]:
298
      nic['nicparams'] = objects.FillDict(
299
        cluster.nicparams[constants.PP_DEFAULT],
300
        nic['nicparams'])
301
    return idict
302

    
303
  def _ConnectList(self, client, node_list, call):
304
    """Helper for computing node addresses.
305

306
    @type client: L{ganeti.rpc.Client}
307
    @param client: a C{Client} instance
308
    @type node_list: list
309
    @param node_list: the node list we should connect
310
    @type call: string
311
    @param call: the name of the remote procedure call, for filling in
312
        correctly any eventual offline nodes' results
313

314
    """
315
    all_nodes = self._cfg.GetAllNodesInfo()
316
    name_list = []
317
    addr_list = []
318
    skip_dict = {}
319
    for node in node_list:
320
      if node in all_nodes:
321
        if all_nodes[node].offline:
322
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
323
          continue
324
        val = all_nodes[node].primary_ip
325
      else:
326
        val = None
327
      addr_list.append(val)
328
      name_list.append(node)
329
    if name_list:
330
      client.ConnectList(name_list, address_list=addr_list)
331
    return skip_dict
332

    
333
  def _ConnectNode(self, client, node, call):
334
    """Helper for computing one node's address.
335

336
    @type client: L{ganeti.rpc.Client}
337
    @param client: a C{Client} instance
338
    @type node: str
339
    @param node: the node we should connect
340
    @type call: string
341
    @param call: the name of the remote procedure call, for filling in
342
        correctly any eventual offline nodes' results
343

344
    """
345
    node_info = self._cfg.GetNodeInfo(node)
346
    if node_info is not None:
347
      if node_info.offline:
348
        return RpcResult(node=node, offline=True, call=call)
349
      addr = node_info.primary_ip
350
    else:
351
      addr = None
352
    client.ConnectNode(node, address=addr)
353

    
354
  def _MultiNodeCall(self, node_list, procedure, args):
355
    """Helper for making a multi-node call
356

357
    """
358
    body = serializer.DumpJson(args, indent=False)
359
    c = Client(procedure, body, self.port)
360
    skip_dict = self._ConnectList(c, node_list, procedure)
361
    skip_dict.update(c.GetResults())
362
    return skip_dict
363

    
364
  @classmethod
365
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
366
                           address_list=None):
367
    """Helper for making a multi-node static call
368

369
    """
370
    body = serializer.DumpJson(args, indent=False)
371
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
372
    c.ConnectList(node_list, address_list=address_list)
373
    return c.GetResults()
374

    
375
  def _SingleNodeCall(self, node, procedure, args):
376
    """Helper for making a single-node call
377

378
    """
379
    body = serializer.DumpJson(args, indent=False)
380
    c = Client(procedure, body, self.port)
381
    result = self._ConnectNode(c, node, procedure)
382
    if result is None:
383
      # we did connect, node is not offline
384
      result = c.GetResults()[node]
385
    return result
386

    
387
  @classmethod
388
  def _StaticSingleNodeCall(cls, node, procedure, args):
389
    """Helper for making a single-node static call
390

391
    """
392
    body = serializer.DumpJson(args, indent=False)
393
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
394
    c.ConnectNode(node)
395
    return c.GetResults()[node]
396

    
397
  @staticmethod
398
  def _Compress(data):
399
    """Compresses a string for transport over RPC.
400

401
    Small amounts of data are not compressed.
402

403
    @type data: str
404
    @param data: Data
405
    @rtype: tuple
406
    @return: Encoded data to send
407

408
    """
409
    # Small amounts of data are not compressed
410
    if len(data) < 512:
411
      return (constants.RPC_ENCODING_NONE, data)
412

    
413
    # Compress with zlib and encode in base64
414
    return (constants.RPC_ENCODING_ZLIB_BASE64,
415
            base64.b64encode(zlib.compress(data, 3)))
416

    
417
  #
418
  # Begin RPC calls
419
  #
420

    
421
  def call_lv_list(self, node_list, vg_name):
422
    """Gets the logical volumes present in a given volume group.
423

424
    This is a multi-node call.
425

426
    """
427
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
428

    
429
  def call_vg_list(self, node_list):
430
    """Gets the volume group list.
431

432
    This is a multi-node call.
433

434
    """
435
    return self._MultiNodeCall(node_list, "vg_list", [])
436

    
437
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
438
    """Get list of storage units.
439

440
    This is a multi-node call.
441

442
    """
443
    return self._MultiNodeCall(node_list, "storage_list",
444
                               [su_name, su_args, name, fields])
445

    
446
  def call_storage_modify(self, node, su_name, su_args, name, changes):
447
    """Modify a storage unit.
448

449
    This is a single-node call.
450

451
    """
452
    return self._SingleNodeCall(node, "storage_modify",
453
                                [su_name, su_args, name, changes])
454

    
455
  def call_storage_execute(self, node, su_name, su_args, name, op):
456
    """Executes an operation on a storage unit.
457

458
    This is a single-node call.
459

460
    """
461
    return self._SingleNodeCall(node, "storage_execute",
462
                                [su_name, su_args, name, op])
463

    
464
  def call_bridges_exist(self, node, bridges_list):
465
    """Checks if a node has all the bridges given.
466

467
    This method checks if all bridges given in the bridges_list are
468
    present on the remote node, so that an instance that uses interfaces
469
    on those bridges can be started.
470

471
    This is a single-node call.
472

473
    """
474
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
475

    
476
  def call_instance_start(self, node, instance, hvp, bep):
477
    """Starts an instance.
478

479
    This is a single-node call.
480

481
    """
482
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
483
    return self._SingleNodeCall(node, "instance_start", [idict])
484

    
485
  def call_instance_shutdown(self, node, instance, timeout):
486
    """Stops an instance.
487

488
    This is a single-node call.
489

490
    """
491
    return self._SingleNodeCall(node, "instance_shutdown",
492
                                [self._InstDict(instance), timeout])
493

    
494
  def call_migration_info(self, node, instance):
495
    """Gather the information necessary to prepare an instance migration.
496

497
    This is a single-node call.
498

499
    @type node: string
500
    @param node: the node on which the instance is currently running
501
    @type instance: C{objects.Instance}
502
    @param instance: the instance definition
503

504
    """
505
    return self._SingleNodeCall(node, "migration_info",
506
                                [self._InstDict(instance)])
507

    
508
  def call_accept_instance(self, node, instance, info, target):
509
    """Prepare a node to accept an instance.
510

511
    This is a single-node call.
512

513
    @type node: string
514
    @param node: the target node for the migration
515
    @type instance: C{objects.Instance}
516
    @param instance: the instance definition
517
    @type info: opaque/hypervisor specific (string/data)
518
    @param info: result for the call_migration_info call
519
    @type target: string
520
    @param target: target hostname (usually ip address) (on the node itself)
521

522
    """
523
    return self._SingleNodeCall(node, "accept_instance",
524
                                [self._InstDict(instance), info, target])
525

    
526
  def call_finalize_migration(self, node, instance, info, success):
527
    """Finalize any target-node migration specific operation.
528

529
    This is called both in case of a successful migration and in case of error
530
    (in which case it should abort the migration).
531

532
    This is a single-node call.
533

534
    @type node: string
535
    @param node: the target node for the migration
536
    @type instance: C{objects.Instance}
537
    @param instance: the instance definition
538
    @type info: opaque/hypervisor specific (string/data)
539
    @param info: result for the call_migration_info call
540
    @type success: boolean
541
    @param success: whether the migration was a success or a failure
542

543
    """
544
    return self._SingleNodeCall(node, "finalize_migration",
545
                                [self._InstDict(instance), info, success])
546

    
547
  def call_instance_migrate(self, node, instance, target, live):
548
    """Migrate an instance.
549

550
    This is a single-node call.
551

552
    @type node: string
553
    @param node: the node on which the instance is currently running
554
    @type instance: C{objects.Instance}
555
    @param instance: the instance definition
556
    @type target: string
557
    @param target: the target node name
558
    @type live: boolean
559
    @param live: whether the migration should be done live or not (the
560
        interpretation of this parameter is left to the hypervisor)
561

562
    """
563
    return self._SingleNodeCall(node, "instance_migrate",
564
                                [self._InstDict(instance), target, live])
565

    
566
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
567
    """Reboots an instance.
568

569
    This is a single-node call.
570

571
    """
572
    return self._SingleNodeCall(node, "instance_reboot",
573
                                [self._InstDict(inst), reboot_type,
574
                                 shutdown_timeout])
575

    
576
  def call_instance_os_add(self, node, inst, reinstall):
577
    """Installs an OS on the given instance.
578

579
    This is a single-node call.
580

581
    """
582
    return self._SingleNodeCall(node, "instance_os_add",
583
                                [self._InstDict(inst), reinstall])
584

    
585
  def call_instance_run_rename(self, node, inst, old_name):
586
    """Run the OS rename script for an instance.
587

588
    This is a single-node call.
589

590
    """
591
    return self._SingleNodeCall(node, "instance_run_rename",
592
                                [self._InstDict(inst), old_name])
593

    
594
  def call_instance_info(self, node, instance, hname):
595
    """Returns information about a single instance.
596

597
    This is a single-node call.
598

599
    @type node: list
600
    @param node: the list of nodes to query
601
    @type instance: string
602
    @param instance: the instance name
603
    @type hname: string
604
    @param hname: the hypervisor type of the instance
605

606
    """
607
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
608

    
609
  def call_instance_migratable(self, node, instance):
610
    """Checks whether the given instance can be migrated.
611

612
    This is a single-node call.
613

614
    @param node: the node to query
615
    @type instance: L{objects.Instance}
616
    @param instance: the instance to check
617

618

619
    """
620
    return self._SingleNodeCall(node, "instance_migratable",
621
                                [self._InstDict(instance)])
622

    
623
  def call_all_instances_info(self, node_list, hypervisor_list):
624
    """Returns information about all instances on the given nodes.
625

626
    This is a multi-node call.
627

628
    @type node_list: list
629
    @param node_list: the list of nodes to query
630
    @type hypervisor_list: list
631
    @param hypervisor_list: the hypervisors to query for instances
632

633
    """
634
    return self._MultiNodeCall(node_list, "all_instances_info",
635
                               [hypervisor_list])
636

    
637
  def call_instance_list(self, node_list, hypervisor_list):
638
    """Returns the list of running instances on a given node.
639

640
    This is a multi-node call.
641

642
    @type node_list: list
643
    @param node_list: the list of nodes to query
644
    @type hypervisor_list: list
645
    @param hypervisor_list: the hypervisors to query for instances
646

647
    """
648
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
649

    
650
  def call_node_tcp_ping(self, node, source, target, port, timeout,
651
                         live_port_needed):
652
    """Do a TcpPing on the remote node
653

654
    This is a single-node call.
655

656
    """
657
    return self._SingleNodeCall(node, "node_tcp_ping",
658
                                [source, target, port, timeout,
659
                                 live_port_needed])
660

    
661
  def call_node_has_ip_address(self, node, address):
662
    """Checks if a node has the given IP address.
663

664
    This is a single-node call.
665

666
    """
667
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
668

    
669
  def call_node_info(self, node_list, vg_name, hypervisor_type):
670
    """Return node information.
671

672
    This will return memory information and volume group size and free
673
    space.
674

675
    This is a multi-node call.
676

677
    @type node_list: list
678
    @param node_list: the list of nodes to query
679
    @type vg_name: C{string}
680
    @param vg_name: the name of the volume group to ask for disk space
681
        information
682
    @type hypervisor_type: C{str}
683
    @param hypervisor_type: the name of the hypervisor to ask for
684
        memory information
685

686
    """
687
    return self._MultiNodeCall(node_list, "node_info",
688
                               [vg_name, hypervisor_type])
689

    
690
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
691
    """Add a node to the cluster.
692

693
    This is a single-node call.
694

695
    """
696
    return self._SingleNodeCall(node, "node_add",
697
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
698

    
699
  def call_node_verify(self, node_list, checkdict, cluster_name):
700
    """Request verification of given parameters.
701

702
    This is a multi-node call.
703

704
    """
705
    return self._MultiNodeCall(node_list, "node_verify",
706
                               [checkdict, cluster_name])
707

    
708
  @classmethod
709
  def call_node_start_master(cls, node, start_daemons, no_voting):
710
    """Tells a node to activate itself as a master.
711

712
    This is a single-node call.
713

714
    """
715
    return cls._StaticSingleNodeCall(node, "node_start_master",
716
                                     [start_daemons, no_voting])
717

    
718
  @classmethod
719
  def call_node_stop_master(cls, node, stop_daemons):
720
    """Tells a node to demote itself from master status.
721

722
    This is a single-node call.
723

724
    """
725
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
726

    
727
  @classmethod
728
  def call_master_info(cls, node_list):
729
    """Query master info.
730

731
    This is a multi-node call.
732

733
    """
734
    # TODO: should this method query down nodes?
735
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
736

    
737
  @classmethod
738
  def call_version(cls, node_list):
739
    """Query node version.
740

741
    This is a multi-node call.
742

743
    """
744
    return cls._StaticMultiNodeCall(node_list, "version", [])
745

    
746
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
747
    """Request creation of a given block device.
748

749
    This is a single-node call.
750

751
    """
752
    return self._SingleNodeCall(node, "blockdev_create",
753
                                [bdev.ToDict(), size, owner, on_primary, info])
754

    
755
  def call_blockdev_remove(self, node, bdev):
756
    """Request removal of a given block device.
757

758
    This is a single-node call.
759

760
    """
761
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
762

    
763
  def call_blockdev_rename(self, node, devlist):
764
    """Request rename of the given block devices.
765

766
    This is a single-node call.
767

768
    """
769
    return self._SingleNodeCall(node, "blockdev_rename",
770
                                [(d.ToDict(), uid) for d, uid in devlist])
771

    
772
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
773
    """Request assembling of a given block device.
774

775
    This is a single-node call.
776

777
    """
778
    return self._SingleNodeCall(node, "blockdev_assemble",
779
                                [disk.ToDict(), owner, on_primary])
780

    
781
  def call_blockdev_shutdown(self, node, disk):
782
    """Request shutdown of a given block device.
783

784
    This is a single-node call.
785

786
    """
787
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
788

    
789
  def call_blockdev_addchildren(self, node, bdev, ndevs):
790
    """Request adding a list of children to a (mirroring) device.
791

792
    This is a single-node call.
793

794
    """
795
    return self._SingleNodeCall(node, "blockdev_addchildren",
796
                                [bdev.ToDict(),
797
                                 [disk.ToDict() for disk in ndevs]])
798

    
799
  def call_blockdev_removechildren(self, node, bdev, ndevs):
800
    """Request removing a list of children from a (mirroring) device.
801

802
    This is a single-node call.
803

804
    """
805
    return self._SingleNodeCall(node, "blockdev_removechildren",
806
                                [bdev.ToDict(),
807
                                 [disk.ToDict() for disk in ndevs]])
808

    
809
  def call_blockdev_getmirrorstatus(self, node, disks):
810
    """Request status of a (mirroring) device.
811

812
    This is a single-node call.
813

814
    """
815
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
816
                                  [dsk.ToDict() for dsk in disks])
817
    if not result.fail_msg:
818
      result.payload = [objects.BlockDevStatus.FromDict(i)
819
                        for i in result.payload]
820
    return result
821

    
822
  def call_blockdev_find(self, node, disk):
823
    """Request identification of a given block device.
824

825
    This is a single-node call.
826

827
    """
828
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
829
    if not result.fail_msg and result.payload is not None:
830
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
831
    return result
832

    
833
  def call_blockdev_close(self, node, instance_name, disks):
834
    """Closes the given block devices.
835

836
    This is a single-node call.
837

838
    """
839
    params = [instance_name, [cf.ToDict() for cf in disks]]
840
    return self._SingleNodeCall(node, "blockdev_close", params)
841

    
842
  def call_blockdev_getsizes(self, node, disks):
843
    """Returns the size of the given disks.
844

845
    This is a single-node call.
846

847
    """
848
    params = [[cf.ToDict() for cf in disks]]
849
    return self._SingleNodeCall(node, "blockdev_getsize", params)
850

    
851
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
852
    """Disconnects the network of the given drbd devices.
853

854
    This is a multi-node call.
855

856
    """
857
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
858
                               [nodes_ip, [cf.ToDict() for cf in disks]])
859

    
860
  def call_drbd_attach_net(self, node_list, nodes_ip,
861
                           disks, instance_name, multimaster):
862
    """Disconnects the given drbd devices.
863

864
    This is a multi-node call.
865

866
    """
867
    return self._MultiNodeCall(node_list, "drbd_attach_net",
868
                               [nodes_ip, [cf.ToDict() for cf in disks],
869
                                instance_name, multimaster])
870

    
871
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
872
    """Waits for the synchronization of drbd devices is complete.
873

874
    This is a multi-node call.
875

876
    """
877
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
878
                               [nodes_ip, [cf.ToDict() for cf in disks]])
879

    
880
  @classmethod
881
  def call_upload_file(cls, node_list, file_name, address_list=None):
882
    """Upload a file.
883

884
    The node will refuse the operation in case the file is not on the
885
    approved file list.
886

887
    This is a multi-node call.
888

889
    @type node_list: list
890
    @param node_list: the list of node names to upload to
891
    @type file_name: str
892
    @param file_name: the filename to upload
893
    @type address_list: list or None
894
    @keyword address_list: an optional list of node addresses, in order
895
        to optimize the RPC speed
896

897
    """
898
    file_contents = utils.ReadFile(file_name)
899
    data = cls._Compress(file_contents)
900
    st = os.stat(file_name)
901
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
902
              st.st_atime, st.st_mtime]
903
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
904
                                    address_list=address_list)
905

    
906
  @classmethod
907
  def call_write_ssconf_files(cls, node_list, values):
908
    """Write ssconf files.
909

910
    This is a multi-node call.
911

912
    """
913
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
914

    
915
  def call_os_diagnose(self, node_list):
916
    """Request a diagnose of OS definitions.
917

918
    This is a multi-node call.
919

920
    """
921
    return self._MultiNodeCall(node_list, "os_diagnose", [])
922

    
923
  def call_os_get(self, node, name):
924
    """Returns an OS definition.
925

926
    This is a single-node call.
927

928
    """
929
    result = self._SingleNodeCall(node, "os_get", [name])
930
    if not result.fail_msg and isinstance(result.payload, dict):
931
      result.payload = objects.OS.FromDict(result.payload)
932
    return result
933

    
934
  def call_hooks_runner(self, node_list, hpath, phase, env):
935
    """Call the hooks runner.
936

937
    Args:
938
      - op: the OpCode instance
939
      - env: a dictionary with the environment
940

941
    This is a multi-node call.
942

943
    """
944
    params = [hpath, phase, env]
945
    return self._MultiNodeCall(node_list, "hooks_runner", params)
946

    
947
  def call_iallocator_runner(self, node, name, idata):
948
    """Call an iallocator on a remote node
949

950
    Args:
951
      - name: the iallocator name
952
      - input: the json-encoded input string
953

954
    This is a single-node call.
955

956
    """
957
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
958

    
959
  def call_blockdev_grow(self, node, cf_bdev, amount):
960
    """Request a snapshot of the given block device.
961

962
    This is a single-node call.
963

964
    """
965
    return self._SingleNodeCall(node, "blockdev_grow",
966
                                [cf_bdev.ToDict(), amount])
967

    
968
  def call_blockdev_export(self, node, cf_bdev,
969
                           dest_node, dest_path, cluster_name):
970
    """Export a given disk to another node.
971

972
    This is a single-node call.
973

974
    """
975
    return self._SingleNodeCall(node, "blockdev_export",
976
                                [cf_bdev.ToDict(), dest_node, dest_path,
977
                                 cluster_name])
978

    
979
  def call_blockdev_snapshot(self, node, cf_bdev):
980
    """Request a snapshot of the given block device.
981

982
    This is a single-node call.
983

984
    """
985
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
986

    
987
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
988
                           cluster_name, idx):
989
    """Request the export of a given snapshot.
990

991
    This is a single-node call.
992

993
    """
994
    return self._SingleNodeCall(node, "snapshot_export",
995
                                [snap_bdev.ToDict(), dest_node,
996
                                 self._InstDict(instance), cluster_name, idx])
997

    
998
  def call_finalize_export(self, node, instance, snap_disks):
999
    """Request the completion of an export operation.
1000

1001
    This writes the export config file, etc.
1002

1003
    This is a single-node call.
1004

1005
    """
1006
    flat_disks = []
1007
    for disk in snap_disks:
1008
      if isinstance(disk, bool):
1009
        flat_disks.append(disk)
1010
      else:
1011
        flat_disks.append(disk.ToDict())
1012

    
1013
    return self._SingleNodeCall(node, "finalize_export",
1014
                                [self._InstDict(instance), flat_disks])
1015

    
1016
  def call_export_info(self, node, path):
1017
    """Queries the export information in a given path.
1018

1019
    This is a single-node call.
1020

1021
    """
1022
    return self._SingleNodeCall(node, "export_info", [path])
1023

    
1024
  def call_instance_os_import(self, node, inst, src_node, src_images,
1025
                              cluster_name):
1026
    """Request the import of a backup into an instance.
1027

1028
    This is a single-node call.
1029

1030
    """
1031
    return self._SingleNodeCall(node, "instance_os_import",
1032
                                [self._InstDict(inst), src_node, src_images,
1033
                                 cluster_name])
1034

    
1035
  def call_export_list(self, node_list):
1036
    """Gets the stored exports list.
1037

1038
    This is a multi-node call.
1039

1040
    """
1041
    return self._MultiNodeCall(node_list, "export_list", [])
1042

    
1043
  def call_export_remove(self, node, export):
1044
    """Requests removal of a given export.
1045

1046
    This is a single-node call.
1047

1048
    """
1049
    return self._SingleNodeCall(node, "export_remove", [export])
1050

    
1051
  @classmethod
1052
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1053
    """Requests a node to clean the cluster information it has.
1054

1055
    This will remove the configuration information from the ganeti data
1056
    dir.
1057

1058
    This is a single-node call.
1059

1060
    """
1061
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1062
                                     [modify_ssh_setup])
1063

    
1064
  def call_node_volumes(self, node_list):
1065
    """Gets all volumes on node(s).
1066

1067
    This is a multi-node call.
1068

1069
    """
1070
    return self._MultiNodeCall(node_list, "node_volumes", [])
1071

    
1072
  def call_node_demote_from_mc(self, node):
1073
    """Demote a node from the master candidate role.
1074

1075
    This is a single-node call.
1076

1077
    """
1078
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1079

    
1080

    
1081
  def call_node_powercycle(self, node, hypervisor):
1082
    """Tries to powercycle a node.
1083

1084
    This is a single-node call.
1085

1086
    """
1087
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1088

    
1089

    
1090
  def call_test_delay(self, node_list, duration):
1091
    """Sleep for a fixed time on given node(s).
1092

1093
    This is a multi-node call.
1094

1095
    """
1096
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1097

    
1098
  def call_file_storage_dir_create(self, node, file_storage_dir):
1099
    """Create the given file storage directory.
1100

1101
    This is a single-node call.
1102

1103
    """
1104
    return self._SingleNodeCall(node, "file_storage_dir_create",
1105
                                [file_storage_dir])
1106

    
1107
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1108
    """Remove the given file storage directory.
1109

1110
    This is a single-node call.
1111

1112
    """
1113
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1114
                                [file_storage_dir])
1115

    
1116
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1117
                                   new_file_storage_dir):
1118
    """Rename file storage directory.
1119

1120
    This is a single-node call.
1121

1122
    """
1123
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1124
                                [old_file_storage_dir, new_file_storage_dir])
1125

    
1126
  @classmethod
1127
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1128
    """Update job queue.
1129

1130
    This is a multi-node call.
1131

1132
    """
1133
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1134
                                    [file_name, cls._Compress(content)],
1135
                                    address_list=address_list)
1136

    
1137
  @classmethod
1138
  def call_jobqueue_purge(cls, node):
1139
    """Purge job queue.
1140

1141
    This is a single-node call.
1142

1143
    """
1144
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1145

    
1146
  @classmethod
1147
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1148
    """Rename a job queue file.
1149

1150
    This is a multi-node call.
1151

1152
    """
1153
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1154
                                    address_list=address_list)
1155

    
1156
  @classmethod
1157
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1158
    """Set the drain flag on the queue.
1159

1160
    This is a multi-node call.
1161

1162
    @type node_list: list
1163
    @param node_list: the list of nodes to query
1164
    @type drain_flag: bool
1165
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1166

1167
    """
1168
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1169
                                    [drain_flag])
1170

    
1171
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1172
    """Validate the hypervisor params.
1173

1174
    This is a multi-node call.
1175

1176
    @type node_list: list
1177
    @param node_list: the list of nodes to query
1178
    @type hvname: string
1179
    @param hvname: the hypervisor name
1180
    @type hvparams: dict
1181
    @param hvparams: the hypervisor parameters to be validated
1182

1183
    """
1184
    cluster = self._cfg.GetClusterInfo()
1185
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1186
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1187
                               [hvname, hv_full])