Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ f95c81bf

History | View | Annotate | Download (33.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @type failed: boolean
87
  @ivar failed: whether the operation failed at RPC level (not
88
      application level on the remote node)
89
  @ivar call: the name of the RPC call
90
  @ivar node: the name of the node to which we made the call
91
  @ivar offline: whether the operation failed because the node was
92
      offline, as opposed to actual failure; offline=True will always
93
      imply failed=True, in order to allow simpler checking if
94
      the user doesn't care about the exact failure mode
95

96
  """
97
  def __init__(self, data=None, failed=False, offline=False,
98
               call=None, node=None):
99
    self.failed = failed
100
    self.offline = offline
101
    self.call = call
102
    self.node = node
103
    if offline:
104
      self.failed = True
105
      self.error = "Node is marked offline"
106
      self.data = self.payload = None
107
    elif failed:
108
      self.error = data
109
      self.data = self.payload = None
110
    else:
111
      self.data = data
112
      self.error = None
113
      if isinstance(data, (tuple, list)) and len(data) == 2:
114
        self.payload = data[1]
115
      else:
116
        self.payload = None
117

    
118
  def Raise(self):
119
    """If the result has failed, raise an OpExecError.
120

121
    This is used so that LU code doesn't have to check for each
122
    result, but instead can call this function.
123

124
    """
125
    if self.failed:
126
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
127
                               (self.call, self.node, self.error))
128

    
129
  def RemoteFailMsg(self):
130
    """Check if the remote procedure failed.
131

132
    This is valid only for RPC calls which return result of the form
133
    (status, data | error_msg).
134

135
    @return: empty string for succcess, otherwise an error message
136

137
    """
138
    def _EnsureErr(val):
139
      """Helper to ensure we return a 'True' value for error."""
140
      if val:
141
        return val
142
      else:
143
        return "No error information"
144

    
145
    if self.failed:
146
      return _EnsureErr(self.error)
147
    if not isinstance(self.data, (tuple, list)):
148
      return "Invalid result type (%s)" % type(self.data)
149
    if len(self.data) != 2:
150
      return "Invalid result length (%d), expected 2" % len(self.data)
151
    if not self.data[0]:
152
      return _EnsureErr(self.data[1])
153
    return ""
154

    
155

    
156
class Client:
157
  """RPC Client class.
158

159
  This class, given a (remote) method name, a list of parameters and a
160
  list of nodes, will contact (in parallel) all nodes, and return a
161
  dict of results (key: node name, value: result).
162

163
  One current bug is that generic failure is still signaled by
164
  'False' result, which is not good. This overloading of values can
165
  cause bugs.
166

167
  """
168
  def __init__(self, procedure, body, port):
169
    self.procedure = procedure
170
    self.body = body
171
    self.port = port
172
    self.nc = {}
173

    
174
    self._ssl_params = \
175
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
176
                         ssl_cert_path=constants.SSL_CERT_FILE)
177

    
178
  def ConnectList(self, node_list, address_list=None):
179
    """Add a list of nodes to the target nodes.
180

181
    @type node_list: list
182
    @param node_list: the list of node names to connect
183
    @type address_list: list or None
184
    @keyword address_list: either None or a list with node addresses,
185
        which must have the same length as the node list
186

187
    """
188
    if address_list is None:
189
      address_list = [None for _ in node_list]
190
    else:
191
      assert len(node_list) == len(address_list), \
192
             "Name and address lists should have the same length"
193
    for node, address in zip(node_list, address_list):
194
      self.ConnectNode(node, address)
195

    
196
  def ConnectNode(self, name, address=None):
197
    """Add a node to the target list.
198

199
    @type name: str
200
    @param name: the node name
201
    @type address: str
202
    @keyword address: the node address, if known
203

204
    """
205
    if address is None:
206
      address = name
207

    
208
    self.nc[name] = \
209
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
210
                                    "/%s" % self.procedure,
211
                                    post_data=self.body,
212
                                    ssl_params=self._ssl_params,
213
                                    ssl_verify_peer=True)
214

    
215
  def GetResults(self):
216
    """Call nodes and return results.
217

218
    @rtype: list
219
    @return: List of RPC results
220

221
    """
222
    assert _http_manager, "RPC module not initialized"
223

    
224
    _http_manager.ExecRequests(self.nc.values())
225

    
226
    results = {}
227

    
228
    for name, req in self.nc.iteritems():
229
      if req.success and req.resp_status_code == http.HTTP_OK:
230
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
231
                                  node=name, call=self.procedure)
232
        continue
233

    
234
      # TODO: Better error reporting
235
      if req.error:
236
        msg = req.error
237
      else:
238
        msg = req.resp_body
239

    
240
      logging.error("RPC error in %s from node %s: %s",
241
                    self.procedure, name, msg)
242
      results[name] = RpcResult(data=msg, failed=True, node=name,
243
                                call=self.procedure)
244

    
245
    return results
246

    
247

    
248
class RpcRunner(object):
249
  """RPC runner class"""
250

    
251
  def __init__(self, cfg):
252
    """Initialized the rpc runner.
253

254
    @type cfg:  C{config.ConfigWriter}
255
    @param cfg: the configuration object that will be used to get data
256
                about the cluster
257

258
    """
259
    self._cfg = cfg
260
    self.port = utils.GetNodeDaemonPort()
261

    
262
  def _InstDict(self, instance, hvp=None, bep=None):
263
    """Convert the given instance to a dict.
264

265
    This is done via the instance's ToDict() method and additionally
266
    we fill the hvparams with the cluster defaults.
267

268
    @type instance: L{objects.Instance}
269
    @param instance: an Instance object
270
    @type hvp: dict or None
271
    @param hvp: a dictionary with overridden hypervisor parameters
272
    @type bep: dict or None
273
    @param bep: a dictionary with overridden backend parameters
274
    @rtype: dict
275
    @return: the instance dict, with the hvparams filled with the
276
        cluster defaults
277

278
    """
279
    idict = instance.ToDict()
280
    cluster = self._cfg.GetClusterInfo()
281
    idict["hvparams"] = cluster.FillHV(instance)
282
    if hvp is not None:
283
      idict["hvparams"].update(hvp)
284
    idict["beparams"] = cluster.FillBE(instance)
285
    if bep is not None:
286
      idict["beparams"].update(bep)
287
    return idict
288

    
289
  def _ConnectList(self, client, node_list, call):
290
    """Helper for computing node addresses.
291

292
    @type client: L{ganeti.rpc.Client}
293
    @param client: a C{Client} instance
294
    @type node_list: list
295
    @param node_list: the node list we should connect
296
    @type call: string
297
    @param call: the name of the remote procedure call, for filling in
298
        correctly any eventual offline nodes' results
299

300
    """
301
    all_nodes = self._cfg.GetAllNodesInfo()
302
    name_list = []
303
    addr_list = []
304
    skip_dict = {}
305
    for node in node_list:
306
      if node in all_nodes:
307
        if all_nodes[node].offline:
308
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
309
          continue
310
        val = all_nodes[node].primary_ip
311
      else:
312
        val = None
313
      addr_list.append(val)
314
      name_list.append(node)
315
    if name_list:
316
      client.ConnectList(name_list, address_list=addr_list)
317
    return skip_dict
318

    
319
  def _ConnectNode(self, client, node, call):
320
    """Helper for computing one node's address.
321

322
    @type client: L{ganeti.rpc.Client}
323
    @param client: a C{Client} instance
324
    @type node: str
325
    @param node: the node we should connect
326
    @type call: string
327
    @param call: the name of the remote procedure call, for filling in
328
        correctly any eventual offline nodes' results
329

330
    """
331
    node_info = self._cfg.GetNodeInfo(node)
332
    if node_info is not None:
333
      if node_info.offline:
334
        return RpcResult(node=node, offline=True, call=call)
335
      addr = node_info.primary_ip
336
    else:
337
      addr = None
338
    client.ConnectNode(node, address=addr)
339

    
340
  def _MultiNodeCall(self, node_list, procedure, args):
341
    """Helper for making a multi-node call
342

343
    """
344
    body = serializer.DumpJson(args, indent=False)
345
    c = Client(procedure, body, self.port)
346
    skip_dict = self._ConnectList(c, node_list, procedure)
347
    skip_dict.update(c.GetResults())
348
    return skip_dict
349

    
350
  @classmethod
351
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
352
                           address_list=None):
353
    """Helper for making a multi-node static call
354

355
    """
356
    body = serializer.DumpJson(args, indent=False)
357
    c = Client(procedure, body, utils.GetNodeDaemonPort())
358
    c.ConnectList(node_list, address_list=address_list)
359
    return c.GetResults()
360

    
361
  def _SingleNodeCall(self, node, procedure, args):
362
    """Helper for making a single-node call
363

364
    """
365
    body = serializer.DumpJson(args, indent=False)
366
    c = Client(procedure, body, self.port)
367
    result = self._ConnectNode(c, node, procedure)
368
    if result is None:
369
      # we did connect, node is not offline
370
      result = c.GetResults()[node]
371
    return result
372

    
373
  @classmethod
374
  def _StaticSingleNodeCall(cls, node, procedure, args):
375
    """Helper for making a single-node static call
376

377
    """
378
    body = serializer.DumpJson(args, indent=False)
379
    c = Client(procedure, body, utils.GetNodeDaemonPort())
380
    c.ConnectNode(node)
381
    return c.GetResults()[node]
382

    
383
  @staticmethod
384
  def _Compress(data):
385
    """Compresses a string for transport over RPC.
386

387
    Small amounts of data are not compressed.
388

389
    @type data: str
390
    @param data: Data
391
    @rtype: tuple
392
    @return: Encoded data to send
393

394
    """
395
    # Small amounts of data are not compressed
396
    if len(data) < 512:
397
      return (constants.RPC_ENCODING_NONE, data)
398

    
399
    # Compress with zlib and encode in base64
400
    return (constants.RPC_ENCODING_ZLIB_BASE64,
401
            base64.b64encode(zlib.compress(data, 3)))
402

    
403
  #
404
  # Begin RPC calls
405
  #
406

    
407
  def call_volume_list(self, node_list, vg_name):
408
    """Gets the logical volumes present in a given volume group.
409

410
    This is a multi-node call.
411

412
    """
413
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
414

    
415
  def call_vg_list(self, node_list):
416
    """Gets the volume group list.
417

418
    This is a multi-node call.
419

420
    """
421
    return self._MultiNodeCall(node_list, "vg_list", [])
422

    
423
  def call_bridges_exist(self, node, bridges_list):
424
    """Checks if a node has all the bridges given.
425

426
    This method checks if all bridges given in the bridges_list are
427
    present on the remote node, so that an instance that uses interfaces
428
    on those bridges can be started.
429

430
    This is a single-node call.
431

432
    """
433
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
434

    
435
  def call_instance_start(self, node, instance, hvp, bep):
436
    """Starts an instance.
437

438
    This is a single-node call.
439

440
    """
441
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
442
    return self._SingleNodeCall(node, "instance_start", [idict])
443

    
444
  def call_instance_shutdown(self, node, instance):
445
    """Stops an instance.
446

447
    This is a single-node call.
448

449
    """
450
    return self._SingleNodeCall(node, "instance_shutdown",
451
                                [self._InstDict(instance)])
452

    
453
  def call_migration_info(self, node, instance):
454
    """Gather the information necessary to prepare an instance migration.
455

456
    This is a single-node call.
457

458
    @type node: string
459
    @param node: the node on which the instance is currently running
460
    @type instance: C{objects.Instance}
461
    @param instance: the instance definition
462

463
    """
464
    return self._SingleNodeCall(node, "migration_info",
465
                                [self._InstDict(instance)])
466

    
467
  def call_accept_instance(self, node, instance, info, target):
468
    """Prepare a node to accept an instance.
469

470
    This is a single-node call.
471

472
    @type node: string
473
    @param node: the target node for the migration
474
    @type instance: C{objects.Instance}
475
    @param instance: the instance definition
476
    @type info: opaque/hypervisor specific (string/data)
477
    @param info: result for the call_migration_info call
478
    @type target: string
479
    @param target: target hostname (usually ip address) (on the node itself)
480

481
    """
482
    return self._SingleNodeCall(node, "accept_instance",
483
                                [self._InstDict(instance), info, target])
484

    
485
  def call_finalize_migration(self, node, instance, info, success):
486
    """Finalize any target-node migration specific operation.
487

488
    This is called both in case of a successful migration and in case of error
489
    (in which case it should abort the migration).
490

491
    This is a single-node call.
492

493
    @type node: string
494
    @param node: the target node for the migration
495
    @type instance: C{objects.Instance}
496
    @param instance: the instance definition
497
    @type info: opaque/hypervisor specific (string/data)
498
    @param info: result for the call_migration_info call
499
    @type success: boolean
500
    @param success: whether the migration was a success or a failure
501

502
    """
503
    return self._SingleNodeCall(node, "finalize_migration",
504
                                [self._InstDict(instance), info, success])
505

    
506
  def call_instance_migrate(self, node, instance, target, live):
507
    """Migrate an instance.
508

509
    This is a single-node call.
510

511
    @type node: string
512
    @param node: the node on which the instance is currently running
513
    @type instance: C{objects.Instance}
514
    @param instance: the instance definition
515
    @type target: string
516
    @param target: the target node name
517
    @type live: boolean
518
    @param live: whether the migration should be done live or not (the
519
        interpretation of this parameter is left to the hypervisor)
520

521
    """
522
    return self._SingleNodeCall(node, "instance_migrate",
523
                                [self._InstDict(instance), target, live])
524

    
525
  def call_instance_reboot(self, node, instance, reboot_type):
526
    """Reboots an instance.
527

528
    This is a single-node call.
529

530
    """
531
    return self._SingleNodeCall(node, "instance_reboot",
532
                                [self._InstDict(instance), reboot_type])
533

    
534
  def call_instance_os_add(self, node, inst):
535
    """Installs an OS on the given instance.
536

537
    This is a single-node call.
538

539
    """
540
    return self._SingleNodeCall(node, "instance_os_add",
541
                                [self._InstDict(inst)])
542

    
543
  def call_instance_run_rename(self, node, inst, old_name):
544
    """Run the OS rename script for an instance.
545

546
    This is a single-node call.
547

548
    """
549
    return self._SingleNodeCall(node, "instance_run_rename",
550
                                [self._InstDict(inst), old_name])
551

    
552
  def call_instance_info(self, node, instance, hname):
553
    """Returns information about a single instance.
554

555
    This is a single-node call.
556

557
    @type node: list
558
    @param node: the list of nodes to query
559
    @type instance: string
560
    @param instance: the instance name
561
    @type hname: string
562
    @param hname: the hypervisor type of the instance
563

564
    """
565
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
566

    
567
  def call_instance_migratable(self, node, instance):
568
    """Checks whether the given instance can be migrated.
569

570
    This is a single-node call.
571

572
    @param node: the node to query
573
    @type instance: L{objects.Instance}
574
    @param instance: the instance to check
575

576

577
    """
578
    return self._SingleNodeCall(node, "instance_migratable",
579
                                [self._InstDict(instance)])
580

    
581
  def call_all_instances_info(self, node_list, hypervisor_list):
582
    """Returns information about all instances on the given nodes.
583

584
    This is a multi-node call.
585

586
    @type node_list: list
587
    @param node_list: the list of nodes to query
588
    @type hypervisor_list: list
589
    @param hypervisor_list: the hypervisors to query for instances
590

591
    """
592
    return self._MultiNodeCall(node_list, "all_instances_info",
593
                               [hypervisor_list])
594

    
595
  def call_instance_list(self, node_list, hypervisor_list):
596
    """Returns the list of running instances on a given node.
597

598
    This is a multi-node call.
599

600
    @type node_list: list
601
    @param node_list: the list of nodes to query
602
    @type hypervisor_list: list
603
    @param hypervisor_list: the hypervisors to query for instances
604

605
    """
606
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
607

    
608
  def call_node_tcp_ping(self, node, source, target, port, timeout,
609
                         live_port_needed):
610
    """Do a TcpPing on the remote node
611

612
    This is a single-node call.
613

614
    """
615
    return self._SingleNodeCall(node, "node_tcp_ping",
616
                                [source, target, port, timeout,
617
                                 live_port_needed])
618

    
619
  def call_node_has_ip_address(self, node, address):
620
    """Checks if a node has the given IP address.
621

622
    This is a single-node call.
623

624
    """
625
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
626

    
627
  def call_node_info(self, node_list, vg_name, hypervisor_type):
628
    """Return node information.
629

630
    This will return memory information and volume group size and free
631
    space.
632

633
    This is a multi-node call.
634

635
    @type node_list: list
636
    @param node_list: the list of nodes to query
637
    @type vg_name: C{string}
638
    @param vg_name: the name of the volume group to ask for disk space
639
        information
640
    @type hypervisor_type: C{str}
641
    @param hypervisor_type: the name of the hypervisor to ask for
642
        memory information
643

644
    """
645
    retux = self._MultiNodeCall(node_list, "node_info",
646
                                [vg_name, hypervisor_type])
647

    
648
    for result in retux.itervalues():
649
      if result.failed or not isinstance(result.data, dict):
650
        result.data = {}
651
      if result.offline:
652
        log_name = None
653
      else:
654
        log_name = "call_node_info"
655

    
656
      utils.CheckDict(result.data, {
657
        'memory_total' : '-',
658
        'memory_dom0' : '-',
659
        'memory_free' : '-',
660
        'vg_size' : 'node_unreachable',
661
        'vg_free' : '-',
662
        }, log_name)
663
    return retux
664

    
665
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
666
    """Add a node to the cluster.
667

668
    This is a single-node call.
669

670
    """
671
    return self._SingleNodeCall(node, "node_add",
672
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
673

    
674
  def call_node_verify(self, node_list, checkdict, cluster_name):
675
    """Request verification of given parameters.
676

677
    This is a multi-node call.
678

679
    """
680
    return self._MultiNodeCall(node_list, "node_verify",
681
                               [checkdict, cluster_name])
682

    
683
  @classmethod
684
  def call_node_start_master(cls, node, start_daemons, no_voting):
685
    """Tells a node to activate itself as a master.
686

687
    This is a single-node call.
688

689
    """
690
    return cls._StaticSingleNodeCall(node, "node_start_master",
691
                                     [start_daemons, no_voting])
692

    
693
  @classmethod
694
  def call_node_stop_master(cls, node, stop_daemons):
695
    """Tells a node to demote itself from master status.
696

697
    This is a single-node call.
698

699
    """
700
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
701

    
702
  @classmethod
703
  def call_master_info(cls, node_list):
704
    """Query master info.
705

706
    This is a multi-node call.
707

708
    """
709
    # TODO: should this method query down nodes?
710
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
711

    
712
  def call_version(self, node_list):
713
    """Query node version.
714

715
    This is a multi-node call.
716

717
    """
718
    return self._MultiNodeCall(node_list, "version", [])
719

    
720
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
721
    """Request creation of a given block device.
722

723
    This is a single-node call.
724

725
    """
726
    return self._SingleNodeCall(node, "blockdev_create",
727
                                [bdev.ToDict(), size, owner, on_primary, info])
728

    
729
  def call_blockdev_remove(self, node, bdev):
730
    """Request removal of a given block device.
731

732
    This is a single-node call.
733

734
    """
735
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
736

    
737
  def call_blockdev_rename(self, node, devlist):
738
    """Request rename of the given block devices.
739

740
    This is a single-node call.
741

742
    """
743
    return self._SingleNodeCall(node, "blockdev_rename",
744
                                [(d.ToDict(), uid) for d, uid in devlist])
745

    
746
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
747
    """Request assembling of a given block device.
748

749
    This is a single-node call.
750

751
    """
752
    return self._SingleNodeCall(node, "blockdev_assemble",
753
                                [disk.ToDict(), owner, on_primary])
754

    
755
  def call_blockdev_shutdown(self, node, disk):
756
    """Request shutdown of a given block device.
757

758
    This is a single-node call.
759

760
    """
761
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
762

    
763
  def call_blockdev_addchildren(self, node, bdev, ndevs):
764
    """Request adding a list of children to a (mirroring) device.
765

766
    This is a single-node call.
767

768
    """
769
    return self._SingleNodeCall(node, "blockdev_addchildren",
770
                                [bdev.ToDict(),
771
                                 [disk.ToDict() for disk in ndevs]])
772

    
773
  def call_blockdev_removechildren(self, node, bdev, ndevs):
774
    """Request removing a list of children from a (mirroring) device.
775

776
    This is a single-node call.
777

778
    """
779
    return self._SingleNodeCall(node, "blockdev_removechildren",
780
                                [bdev.ToDict(),
781
                                 [disk.ToDict() for disk in ndevs]])
782

    
783
  def call_blockdev_getmirrorstatus(self, node, disks):
784
    """Request status of a (mirroring) device.
785

786
    This is a single-node call.
787

788
    """
789
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
790
                                [dsk.ToDict() for dsk in disks])
791

    
792
  def call_blockdev_find(self, node, disk):
793
    """Request identification of a given block device.
794

795
    This is a single-node call.
796

797
    """
798
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
799

    
800
  def call_blockdev_close(self, node, instance_name, disks):
801
    """Closes the given block devices.
802

803
    This is a single-node call.
804

805
    """
806
    params = [instance_name, [cf.ToDict() for cf in disks]]
807
    return self._SingleNodeCall(node, "blockdev_close", params)
808

    
809
  def call_blockdev_getsizes(self, node, disks):
810
    """Returns the size of the given disks.
811

812
    This is a single-node call.
813

814
    """
815
    params = [[cf.ToDict() for cf in disks]]
816
    return self._SingleNodeCall(node, "blockdev_getsize", params)
817

    
818
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
819
    """Disconnects the network of the given drbd devices.
820

821
    This is a multi-node call.
822

823
    """
824
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
825
                               [nodes_ip, [cf.ToDict() for cf in disks]])
826

    
827
  def call_drbd_attach_net(self, node_list, nodes_ip,
828
                           disks, instance_name, multimaster):
829
    """Disconnects the given drbd devices.
830

831
    This is a multi-node call.
832

833
    """
834
    return self._MultiNodeCall(node_list, "drbd_attach_net",
835
                               [nodes_ip, [cf.ToDict() for cf in disks],
836
                                instance_name, multimaster])
837

    
838
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
839
    """Waits for the synchronization of drbd devices is complete.
840

841
    This is a multi-node call.
842

843
    """
844
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
845
                               [nodes_ip, [cf.ToDict() for cf in disks]])
846

    
847
  @classmethod
848
  def call_upload_file(cls, node_list, file_name, address_list=None):
849
    """Upload a file.
850

851
    The node will refuse the operation in case the file is not on the
852
    approved file list.
853

854
    This is a multi-node call.
855

856
    @type node_list: list
857
    @param node_list: the list of node names to upload to
858
    @type file_name: str
859
    @param file_name: the filename to upload
860
    @type address_list: list or None
861
    @keyword address_list: an optional list of node addresses, in order
862
        to optimize the RPC speed
863

864
    """
865
    file_contents = utils.ReadFile(file_name)
866
    data = cls._Compress(file_contents)
867
    st = os.stat(file_name)
868
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
869
              st.st_atime, st.st_mtime]
870
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
871
                                    address_list=address_list)
872

    
873
  @classmethod
874
  def call_write_ssconf_files(cls, node_list, values):
875
    """Write ssconf files.
876

877
    This is a multi-node call.
878

879
    """
880
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
881

    
882
  def call_os_diagnose(self, node_list):
883
    """Request a diagnose of OS definitions.
884

885
    This is a multi-node call.
886

887
    """
888
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
889

    
890
    for node_result in result.values():
891
      if not node_result.failed and node_result.data:
892
        node_result.data = [objects.OS.FromDict(oss)
893
                            for oss in node_result.data]
894
    return result
895

    
896
  def call_os_get(self, node, name):
897
    """Returns an OS definition.
898

899
    This is a single-node call.
900

901
    """
902
    result = self._SingleNodeCall(node, "os_get", [name])
903
    if not result.failed and isinstance(result.data, dict):
904
      result.data = objects.OS.FromDict(result.data)
905
    return result
906

    
907
  def call_hooks_runner(self, node_list, hpath, phase, env):
908
    """Call the hooks runner.
909

910
    Args:
911
      - op: the OpCode instance
912
      - env: a dictionary with the environment
913

914
    This is a multi-node call.
915

916
    """
917
    params = [hpath, phase, env]
918
    return self._MultiNodeCall(node_list, "hooks_runner", params)
919

    
920
  def call_iallocator_runner(self, node, name, idata):
921
    """Call an iallocator on a remote node
922

923
    Args:
924
      - name: the iallocator name
925
      - input: the json-encoded input string
926

927
    This is a single-node call.
928

929
    """
930
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
931

    
932
  def call_blockdev_grow(self, node, cf_bdev, amount):
933
    """Request a snapshot of the given block device.
934

935
    This is a single-node call.
936

937
    """
938
    return self._SingleNodeCall(node, "blockdev_grow",
939
                                [cf_bdev.ToDict(), amount])
940

    
941
  def call_blockdev_snapshot(self, node, cf_bdev):
942
    """Request a snapshot of the given block device.
943

944
    This is a single-node call.
945

946
    """
947
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
948

    
949
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
950
                           cluster_name, idx):
951
    """Request the export of a given snapshot.
952

953
    This is a single-node call.
954

955
    """
956
    return self._SingleNodeCall(node, "snapshot_export",
957
                                [snap_bdev.ToDict(), dest_node,
958
                                 self._InstDict(instance), cluster_name, idx])
959

    
960
  def call_finalize_export(self, node, instance, snap_disks):
961
    """Request the completion of an export operation.
962

963
    This writes the export config file, etc.
964

965
    This is a single-node call.
966

967
    """
968
    flat_disks = []
969
    for disk in snap_disks:
970
      if isinstance(disk, bool):
971
        flat_disks.append(disk)
972
      else:
973
        flat_disks.append(disk.ToDict())
974

    
975
    return self._SingleNodeCall(node, "finalize_export",
976
                                [self._InstDict(instance), flat_disks])
977

    
978
  def call_export_info(self, node, path):
979
    """Queries the export information in a given path.
980

981
    This is a single-node call.
982

983
    """
984
    result = self._SingleNodeCall(node, "export_info", [path])
985
    if not result.failed and result.data:
986
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
987
    return result
988

    
989
  def call_instance_os_import(self, node, inst, src_node, src_images,
990
                              cluster_name):
991
    """Request the import of a backup into an instance.
992

993
    This is a single-node call.
994

995
    """
996
    return self._SingleNodeCall(node, "instance_os_import",
997
                                [self._InstDict(inst), src_node, src_images,
998
                                 cluster_name])
999

    
1000
  def call_export_list(self, node_list):
1001
    """Gets the stored exports list.
1002

1003
    This is a multi-node call.
1004

1005
    """
1006
    return self._MultiNodeCall(node_list, "export_list", [])
1007

    
1008
  def call_export_remove(self, node, export):
1009
    """Requests removal of a given export.
1010

1011
    This is a single-node call.
1012

1013
    """
1014
    return self._SingleNodeCall(node, "export_remove", [export])
1015

    
1016
  @classmethod
1017
  def call_node_leave_cluster(cls, node):
1018
    """Requests a node to clean the cluster information it has.
1019

1020
    This will remove the configuration information from the ganeti data
1021
    dir.
1022

1023
    This is a single-node call.
1024

1025
    """
1026
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1027

    
1028
  def call_node_volumes(self, node_list):
1029
    """Gets all volumes on node(s).
1030

1031
    This is a multi-node call.
1032

1033
    """
1034
    return self._MultiNodeCall(node_list, "node_volumes", [])
1035

    
1036
  def call_node_demote_from_mc(self, node):
1037
    """Demote a node from the master candidate role.
1038

1039
    This is a single-node call.
1040

1041
    """
1042
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1043

    
1044
  def call_test_delay(self, node_list, duration):
1045
    """Sleep for a fixed time on given node(s).
1046

1047
    This is a multi-node call.
1048

1049
    """
1050
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1051

    
1052
  def call_file_storage_dir_create(self, node, file_storage_dir):
1053
    """Create the given file storage directory.
1054

1055
    This is a single-node call.
1056

1057
    """
1058
    return self._SingleNodeCall(node, "file_storage_dir_create",
1059
                                [file_storage_dir])
1060

    
1061
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1062
    """Remove the given file storage directory.
1063

1064
    This is a single-node call.
1065

1066
    """
1067
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1068
                                [file_storage_dir])
1069

    
1070
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1071
                                   new_file_storage_dir):
1072
    """Rename file storage directory.
1073

1074
    This is a single-node call.
1075

1076
    """
1077
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1078
                                [old_file_storage_dir, new_file_storage_dir])
1079

    
1080
  @classmethod
1081
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1082
    """Update job queue.
1083

1084
    This is a multi-node call.
1085

1086
    """
1087
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1088
                                    [file_name, cls._Compress(content)],
1089
                                    address_list=address_list)
1090

    
1091
  @classmethod
1092
  def call_jobqueue_purge(cls, node):
1093
    """Purge job queue.
1094

1095
    This is a single-node call.
1096

1097
    """
1098
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1099

    
1100
  @classmethod
1101
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1102
    """Rename a job queue file.
1103

1104
    This is a multi-node call.
1105

1106
    """
1107
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1108
                                    address_list=address_list)
1109

    
1110
  @classmethod
1111
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1112
    """Set the drain flag on the queue.
1113

1114
    This is a multi-node call.
1115

1116
    @type node_list: list
1117
    @param node_list: the list of nodes to query
1118
    @type drain_flag: bool
1119
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1120

1121
    """
1122
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1123
                                    [drain_flag])
1124

    
1125
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1126
    """Validate the hypervisor params.
1127

1128
    This is a multi-node call.
1129

1130
    @type node_list: list
1131
    @param node_list: the list of nodes to query
1132
    @type hvname: string
1133
    @param hvname: the hypervisor name
1134
    @type hvparams: dict
1135
    @param hvparams: the hypervisor parameters to be validated
1136

1137
    """
1138
    cluster = self._cfg.GetClusterInfo()
1139
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1140
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1141
                               [hvname, hv_full])