Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ eb630f50

History | View | Annotate | Download (37.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
# pylint has a bug here, doesn't see this import
46
import ganeti.http.client  # pylint: disable-msg=W0611
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager # pylint: disable-msg=W0603
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  http.InitSsl()
64

    
65
  _http_manager = http.client.HttpClientManager()
66

    
67

    
68
def Shutdown():
69
  """Stops the module-global HTTP client manager.
70

71
  Must be called before quitting the program.
72

73
  """
74
  global _http_manager # pylint: disable-msg=W0603
75

    
76
  if _http_manager:
77
    _http_manager.Shutdown()
78
    _http_manager = None
79

    
80

    
81
class RpcResult(object):
82
  """RPC Result class.
83

84
  This class holds an RPC result. It is needed since in multi-node
85
  calls we can't raise an exception just because one one out of many
86
  failed, and therefore we use this class to encapsulate the result.
87

88
  @ivar data: the data payload, for successful results, or None
89
  @ivar call: the name of the RPC call
90
  @ivar node: the name of the node to which we made the call
91
  @ivar offline: whether the operation failed because the node was
92
      offline, as opposed to actual failure; offline=True will always
93
      imply failed=True, in order to allow simpler checking if
94
      the user doesn't care about the exact failure mode
95
  @ivar fail_msg: the error message if the call failed
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.offline = offline
101
    self.call = call
102
    self.node = node
103

    
104
    if offline:
105
      self.fail_msg = "Node is marked offline"
106
      self.data = self.payload = None
107
    elif failed:
108
      self.fail_msg = self._EnsureErr(data)
109
      self.data = self.payload = None
110
    else:
111
      self.data = data
112
      if not isinstance(self.data, (tuple, list)):
113
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
114
                         type(self.data))
115
        self.payload = None
116
      elif len(data) != 2:
117
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
118
                         "expected 2" % len(self.data))
119
        self.payload = None
120
      elif not self.data[0]:
121
        self.fail_msg = self._EnsureErr(self.data[1])
122
        self.payload = None
123
      else:
124
        # finally success
125
        self.fail_msg = None
126
        self.payload = data[1]
127

    
128
    assert hasattr(self, "call")
129
    assert hasattr(self, "data")
130
    assert hasattr(self, "fail_msg")
131
    assert hasattr(self, "node")
132
    assert hasattr(self, "offline")
133
    assert hasattr(self, "payload")
134

    
135
  @staticmethod
136
  def _EnsureErr(val):
137
    """Helper to ensure we return a 'True' value for error."""
138
    if val:
139
      return val
140
    else:
141
      return "No error information"
142

    
143
  def Raise(self, msg, prereq=False, ecode=None):
144
    """If the result has failed, raise an OpExecError.
145

146
    This is used so that LU code doesn't have to check for each
147
    result, but instead can call this function.
148

149
    """
150
    if not self.fail_msg:
151
      return
152

    
153
    if not msg: # one could pass None for default message
154
      msg = ("Call '%s' to node '%s' has failed: %s" %
155
             (self.call, self.node, self.fail_msg))
156
    else:
157
      msg = "%s: %s" % (msg, self.fail_msg)
158
    if prereq:
159
      ec = errors.OpPrereqError
160
    else:
161
      ec = errors.OpExecError
162
    if ecode is not None:
163
      args = (msg, prereq)
164
    else:
165
      args = (msg, )
166
    raise ec(*args) # pylint: disable-msg=W0142
167

    
168

    
169
class Client:
170
  """RPC Client class.
171

172
  This class, given a (remote) method name, a list of parameters and a
173
  list of nodes, will contact (in parallel) all nodes, and return a
174
  dict of results (key: node name, value: result).
175

176
  One current bug is that generic failure is still signaled by
177
  'False' result, which is not good. This overloading of values can
178
  cause bugs.
179

180
  """
181
  def __init__(self, procedure, body, port):
182
    self.procedure = procedure
183
    self.body = body
184
    self.port = port
185
    self.nc = {}
186

    
187
    self._ssl_params = \
188
      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189
                         ssl_cert_path=constants.NODED_CERT_FILE)
190

    
191
  def ConnectList(self, node_list, address_list=None):
192
    """Add a list of nodes to the target nodes.
193

194
    @type node_list: list
195
    @param node_list: the list of node names to connect
196
    @type address_list: list or None
197
    @keyword address_list: either None or a list with node addresses,
198
        which must have the same length as the node list
199

200
    """
201
    if address_list is None:
202
      address_list = [None for _ in node_list]
203
    else:
204
      assert len(node_list) == len(address_list), \
205
             "Name and address lists should have the same length"
206
    for node, address in zip(node_list, address_list):
207
      self.ConnectNode(node, address)
208

    
209
  def ConnectNode(self, name, address=None):
210
    """Add a node to the target list.
211

212
    @type name: str
213
    @param name: the node name
214
    @type address: str
215
    @keyword address: the node address, if known
216

217
    """
218
    if address is None:
219
      address = name
220

    
221
    self.nc[name] = \
222
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
223
                                    "/%s" % self.procedure,
224
                                    post_data=self.body,
225
                                    ssl_params=self._ssl_params,
226
                                    ssl_verify_peer=True)
227

    
228
  def GetResults(self):
229
    """Call nodes and return results.
230

231
    @rtype: list
232
    @return: List of RPC results
233

234
    """
235
    assert _http_manager, "RPC module not initialized"
236

    
237
    _http_manager.ExecRequests(self.nc.values())
238

    
239
    results = {}
240

    
241
    for name, req in self.nc.iteritems():
242
      if req.success and req.resp_status_code == http.HTTP_OK:
243
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
244
                                  node=name, call=self.procedure)
245
        continue
246

    
247
      # TODO: Better error reporting
248
      if req.error:
249
        msg = req.error
250
      else:
251
        msg = req.resp_body
252

    
253
      logging.error("RPC error in %s from node %s: %s",
254
                    self.procedure, name, msg)
255
      results[name] = RpcResult(data=msg, failed=True, node=name,
256
                                call=self.procedure)
257

    
258
    return results
259

    
260

    
261
def _EncodeImportExportIO(ieio, ieioargs):
262
  """Encodes import/export I/O information.
263

264
  """
265
  if ieio == constants.IEIO_RAW_DISK:
266
    assert len(ieioargs) == 1
267
    return (ieioargs[0].ToDict(), )
268

    
269
  if ieio == constants.IEIO_SCRIPT:
270
    assert len(ieioargs) == 2
271
    return (ieioargs[0].ToDict(), ieioargs[1])
272

    
273
  return ieioargs
274

    
275

    
276
class RpcRunner(object):
277
  """RPC runner class"""
278

    
279
  def __init__(self, cfg):
280
    """Initialized the rpc runner.
281

282
    @type cfg:  C{config.ConfigWriter}
283
    @param cfg: the configuration object that will be used to get data
284
                about the cluster
285

286
    """
287
    self._cfg = cfg
288
    self.port = utils.GetDaemonPort(constants.NODED)
289

    
290
  def _InstDict(self, instance, hvp=None, bep=None):
291
    """Convert the given instance to a dict.
292

293
    This is done via the instance's ToDict() method and additionally
294
    we fill the hvparams with the cluster defaults.
295

296
    @type instance: L{objects.Instance}
297
    @param instance: an Instance object
298
    @type hvp: dict or None
299
    @param hvp: a dictionary with overridden hypervisor parameters
300
    @type bep: dict or None
301
    @param bep: a dictionary with overridden backend parameters
302
    @rtype: dict
303
    @return: the instance dict, with the hvparams filled with the
304
        cluster defaults
305

306
    """
307
    idict = instance.ToDict()
308
    cluster = self._cfg.GetClusterInfo()
309
    idict["hvparams"] = cluster.FillHV(instance)
310
    if hvp is not None:
311
      idict["hvparams"].update(hvp)
312
    idict["beparams"] = cluster.FillBE(instance)
313
    if bep is not None:
314
      idict["beparams"].update(bep)
315
    for nic in idict["nics"]:
316
      nic['nicparams'] = objects.FillDict(
317
        cluster.nicparams[constants.PP_DEFAULT],
318
        nic['nicparams'])
319
    return idict
320

    
321
  def _ConnectList(self, client, node_list, call):
322
    """Helper for computing node addresses.
323

324
    @type client: L{ganeti.rpc.Client}
325
    @param client: a C{Client} instance
326
    @type node_list: list
327
    @param node_list: the node list we should connect
328
    @type call: string
329
    @param call: the name of the remote procedure call, for filling in
330
        correctly any eventual offline nodes' results
331

332
    """
333
    all_nodes = self._cfg.GetAllNodesInfo()
334
    name_list = []
335
    addr_list = []
336
    skip_dict = {}
337
    for node in node_list:
338
      if node in all_nodes:
339
        if all_nodes[node].offline:
340
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
341
          continue
342
        val = all_nodes[node].primary_ip
343
      else:
344
        val = None
345
      addr_list.append(val)
346
      name_list.append(node)
347
    if name_list:
348
      client.ConnectList(name_list, address_list=addr_list)
349
    return skip_dict
350

    
351
  def _ConnectNode(self, client, node, call):
352
    """Helper for computing one node's address.
353

354
    @type client: L{ganeti.rpc.Client}
355
    @param client: a C{Client} instance
356
    @type node: str
357
    @param node: the node we should connect
358
    @type call: string
359
    @param call: the name of the remote procedure call, for filling in
360
        correctly any eventual offline nodes' results
361

362
    """
363
    node_info = self._cfg.GetNodeInfo(node)
364
    if node_info is not None:
365
      if node_info.offline:
366
        return RpcResult(node=node, offline=True, call=call)
367
      addr = node_info.primary_ip
368
    else:
369
      addr = None
370
    client.ConnectNode(node, address=addr)
371

    
372
  def _MultiNodeCall(self, node_list, procedure, args):
373
    """Helper for making a multi-node call
374

375
    """
376
    body = serializer.DumpJson(args, indent=False)
377
    c = Client(procedure, body, self.port)
378
    skip_dict = self._ConnectList(c, node_list, procedure)
379
    skip_dict.update(c.GetResults())
380
    return skip_dict
381

    
382
  @classmethod
383
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
384
                           address_list=None):
385
    """Helper for making a multi-node static call
386

387
    """
388
    body = serializer.DumpJson(args, indent=False)
389
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
390
    c.ConnectList(node_list, address_list=address_list)
391
    return c.GetResults()
392

    
393
  def _SingleNodeCall(self, node, procedure, args):
394
    """Helper for making a single-node call
395

396
    """
397
    body = serializer.DumpJson(args, indent=False)
398
    c = Client(procedure, body, self.port)
399
    result = self._ConnectNode(c, node, procedure)
400
    if result is None:
401
      # we did connect, node is not offline
402
      result = c.GetResults()[node]
403
    return result
404

    
405
  @classmethod
406
  def _StaticSingleNodeCall(cls, node, procedure, args):
407
    """Helper for making a single-node static call
408

409
    """
410
    body = serializer.DumpJson(args, indent=False)
411
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
412
    c.ConnectNode(node)
413
    return c.GetResults()[node]
414

    
415
  @staticmethod
416
  def _Compress(data):
417
    """Compresses a string for transport over RPC.
418

419
    Small amounts of data are not compressed.
420

421
    @type data: str
422
    @param data: Data
423
    @rtype: tuple
424
    @return: Encoded data to send
425

426
    """
427
    # Small amounts of data are not compressed
428
    if len(data) < 512:
429
      return (constants.RPC_ENCODING_NONE, data)
430

    
431
    # Compress with zlib and encode in base64
432
    return (constants.RPC_ENCODING_ZLIB_BASE64,
433
            base64.b64encode(zlib.compress(data, 3)))
434

    
435
  #
436
  # Begin RPC calls
437
  #
438

    
439
  def call_lv_list(self, node_list, vg_name):
440
    """Gets the logical volumes present in a given volume group.
441

442
    This is a multi-node call.
443

444
    """
445
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
446

    
447
  def call_vg_list(self, node_list):
448
    """Gets the volume group list.
449

450
    This is a multi-node call.
451

452
    """
453
    return self._MultiNodeCall(node_list, "vg_list", [])
454

    
455
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
456
    """Get list of storage units.
457

458
    This is a multi-node call.
459

460
    """
461
    return self._MultiNodeCall(node_list, "storage_list",
462
                               [su_name, su_args, name, fields])
463

    
464
  def call_storage_modify(self, node, su_name, su_args, name, changes):
465
    """Modify a storage unit.
466

467
    This is a single-node call.
468

469
    """
470
    return self._SingleNodeCall(node, "storage_modify",
471
                                [su_name, su_args, name, changes])
472

    
473
  def call_storage_execute(self, node, su_name, su_args, name, op):
474
    """Executes an operation on a storage unit.
475

476
    This is a single-node call.
477

478
    """
479
    return self._SingleNodeCall(node, "storage_execute",
480
                                [su_name, su_args, name, op])
481

    
482
  def call_bridges_exist(self, node, bridges_list):
483
    """Checks if a node has all the bridges given.
484

485
    This method checks if all bridges given in the bridges_list are
486
    present on the remote node, so that an instance that uses interfaces
487
    on those bridges can be started.
488

489
    This is a single-node call.
490

491
    """
492
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
493

    
494
  def call_instance_start(self, node, instance, hvp, bep):
495
    """Starts an instance.
496

497
    This is a single-node call.
498

499
    """
500
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
501
    return self._SingleNodeCall(node, "instance_start", [idict])
502

    
503
  def call_instance_shutdown(self, node, instance, timeout):
504
    """Stops an instance.
505

506
    This is a single-node call.
507

508
    """
509
    return self._SingleNodeCall(node, "instance_shutdown",
510
                                [self._InstDict(instance), timeout])
511

    
512
  def call_migration_info(self, node, instance):
513
    """Gather the information necessary to prepare an instance migration.
514

515
    This is a single-node call.
516

517
    @type node: string
518
    @param node: the node on which the instance is currently running
519
    @type instance: C{objects.Instance}
520
    @param instance: the instance definition
521

522
    """
523
    return self._SingleNodeCall(node, "migration_info",
524
                                [self._InstDict(instance)])
525

    
526
  def call_accept_instance(self, node, instance, info, target):
527
    """Prepare a node to accept an instance.
528

529
    This is a single-node call.
530

531
    @type node: string
532
    @param node: the target node for the migration
533
    @type instance: C{objects.Instance}
534
    @param instance: the instance definition
535
    @type info: opaque/hypervisor specific (string/data)
536
    @param info: result for the call_migration_info call
537
    @type target: string
538
    @param target: target hostname (usually ip address) (on the node itself)
539

540
    """
541
    return self._SingleNodeCall(node, "accept_instance",
542
                                [self._InstDict(instance), info, target])
543

    
544
  def call_finalize_migration(self, node, instance, info, success):
545
    """Finalize any target-node migration specific operation.
546

547
    This is called both in case of a successful migration and in case of error
548
    (in which case it should abort the migration).
549

550
    This is a single-node call.
551

552
    @type node: string
553
    @param node: the target node for the migration
554
    @type instance: C{objects.Instance}
555
    @param instance: the instance definition
556
    @type info: opaque/hypervisor specific (string/data)
557
    @param info: result for the call_migration_info call
558
    @type success: boolean
559
    @param success: whether the migration was a success or a failure
560

561
    """
562
    return self._SingleNodeCall(node, "finalize_migration",
563
                                [self._InstDict(instance), info, success])
564

    
565
  def call_instance_migrate(self, node, instance, target, live):
566
    """Migrate an instance.
567

568
    This is a single-node call.
569

570
    @type node: string
571
    @param node: the node on which the instance is currently running
572
    @type instance: C{objects.Instance}
573
    @param instance: the instance definition
574
    @type target: string
575
    @param target: the target node name
576
    @type live: boolean
577
    @param live: whether the migration should be done live or not (the
578
        interpretation of this parameter is left to the hypervisor)
579

580
    """
581
    return self._SingleNodeCall(node, "instance_migrate",
582
                                [self._InstDict(instance), target, live])
583

    
584
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
585
    """Reboots an instance.
586

587
    This is a single-node call.
588

589
    """
590
    return self._SingleNodeCall(node, "instance_reboot",
591
                                [self._InstDict(inst), reboot_type,
592
                                 shutdown_timeout])
593

    
594
  def call_instance_os_add(self, node, inst, reinstall, debug):
595
    """Installs an OS on the given instance.
596

597
    This is a single-node call.
598

599
    """
600
    return self._SingleNodeCall(node, "instance_os_add",
601
                                [self._InstDict(inst), reinstall, debug])
602

    
603
  def call_instance_run_rename(self, node, inst, old_name, debug):
604
    """Run the OS rename script for an instance.
605

606
    This is a single-node call.
607

608
    """
609
    return self._SingleNodeCall(node, "instance_run_rename",
610
                                [self._InstDict(inst), old_name, debug])
611

    
612
  def call_instance_info(self, node, instance, hname):
613
    """Returns information about a single instance.
614

615
    This is a single-node call.
616

617
    @type node: list
618
    @param node: the list of nodes to query
619
    @type instance: string
620
    @param instance: the instance name
621
    @type hname: string
622
    @param hname: the hypervisor type of the instance
623

624
    """
625
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
626

    
627
  def call_instance_migratable(self, node, instance):
628
    """Checks whether the given instance can be migrated.
629

630
    This is a single-node call.
631

632
    @param node: the node to query
633
    @type instance: L{objects.Instance}
634
    @param instance: the instance to check
635

636

637
    """
638
    return self._SingleNodeCall(node, "instance_migratable",
639
                                [self._InstDict(instance)])
640

    
641
  def call_all_instances_info(self, node_list, hypervisor_list):
642
    """Returns information about all instances on the given nodes.
643

644
    This is a multi-node call.
645

646
    @type node_list: list
647
    @param node_list: the list of nodes to query
648
    @type hypervisor_list: list
649
    @param hypervisor_list: the hypervisors to query for instances
650

651
    """
652
    return self._MultiNodeCall(node_list, "all_instances_info",
653
                               [hypervisor_list])
654

    
655
  def call_instance_list(self, node_list, hypervisor_list):
656
    """Returns the list of running instances on a given node.
657

658
    This is a multi-node call.
659

660
    @type node_list: list
661
    @param node_list: the list of nodes to query
662
    @type hypervisor_list: list
663
    @param hypervisor_list: the hypervisors to query for instances
664

665
    """
666
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
667

    
668
  def call_node_tcp_ping(self, node, source, target, port, timeout,
669
                         live_port_needed):
670
    """Do a TcpPing on the remote node
671

672
    This is a single-node call.
673

674
    """
675
    return self._SingleNodeCall(node, "node_tcp_ping",
676
                                [source, target, port, timeout,
677
                                 live_port_needed])
678

    
679
  def call_node_has_ip_address(self, node, address):
680
    """Checks if a node has the given IP address.
681

682
    This is a single-node call.
683

684
    """
685
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
686

    
687
  def call_node_info(self, node_list, vg_name, hypervisor_type):
688
    """Return node information.
689

690
    This will return memory information and volume group size and free
691
    space.
692

693
    This is a multi-node call.
694

695
    @type node_list: list
696
    @param node_list: the list of nodes to query
697
    @type vg_name: C{string}
698
    @param vg_name: the name of the volume group to ask for disk space
699
        information
700
    @type hypervisor_type: C{str}
701
    @param hypervisor_type: the name of the hypervisor to ask for
702
        memory information
703

704
    """
705
    return self._MultiNodeCall(node_list, "node_info",
706
                               [vg_name, hypervisor_type])
707

    
708
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
709
    """Add a node to the cluster.
710

711
    This is a single-node call.
712

713
    """
714
    return self._SingleNodeCall(node, "node_add",
715
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
716

    
717
  def call_node_verify(self, node_list, checkdict, cluster_name):
718
    """Request verification of given parameters.
719

720
    This is a multi-node call.
721

722
    """
723
    return self._MultiNodeCall(node_list, "node_verify",
724
                               [checkdict, cluster_name])
725

    
726
  @classmethod
727
  def call_node_start_master(cls, node, start_daemons, no_voting):
728
    """Tells a node to activate itself as a master.
729

730
    This is a single-node call.
731

732
    """
733
    return cls._StaticSingleNodeCall(node, "node_start_master",
734
                                     [start_daemons, no_voting])
735

    
736
  @classmethod
737
  def call_node_stop_master(cls, node, stop_daemons):
738
    """Tells a node to demote itself from master status.
739

740
    This is a single-node call.
741

742
    """
743
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
744

    
745
  @classmethod
746
  def call_master_info(cls, node_list):
747
    """Query master info.
748

749
    This is a multi-node call.
750

751
    """
752
    # TODO: should this method query down nodes?
753
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
754

    
755
  @classmethod
756
  def call_version(cls, node_list):
757
    """Query node version.
758

759
    This is a multi-node call.
760

761
    """
762
    return cls._StaticMultiNodeCall(node_list, "version", [])
763

    
764
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
765
    """Request creation of a given block device.
766

767
    This is a single-node call.
768

769
    """
770
    return self._SingleNodeCall(node, "blockdev_create",
771
                                [bdev.ToDict(), size, owner, on_primary, info])
772

    
773
  def call_blockdev_remove(self, node, bdev):
774
    """Request removal of a given block device.
775

776
    This is a single-node call.
777

778
    """
779
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
780

    
781
  def call_blockdev_rename(self, node, devlist):
782
    """Request rename of the given block devices.
783

784
    This is a single-node call.
785

786
    """
787
    return self._SingleNodeCall(node, "blockdev_rename",
788
                                [(d.ToDict(), uid) for d, uid in devlist])
789

    
790
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
791
    """Request assembling of a given block device.
792

793
    This is a single-node call.
794

795
    """
796
    return self._SingleNodeCall(node, "blockdev_assemble",
797
                                [disk.ToDict(), owner, on_primary])
798

    
799
  def call_blockdev_shutdown(self, node, disk):
800
    """Request shutdown of a given block device.
801

802
    This is a single-node call.
803

804
    """
805
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
806

    
807
  def call_blockdev_addchildren(self, node, bdev, ndevs):
808
    """Request adding a list of children to a (mirroring) device.
809

810
    This is a single-node call.
811

812
    """
813
    return self._SingleNodeCall(node, "blockdev_addchildren",
814
                                [bdev.ToDict(),
815
                                 [disk.ToDict() for disk in ndevs]])
816

    
817
  def call_blockdev_removechildren(self, node, bdev, ndevs):
818
    """Request removing a list of children from a (mirroring) device.
819

820
    This is a single-node call.
821

822
    """
823
    return self._SingleNodeCall(node, "blockdev_removechildren",
824
                                [bdev.ToDict(),
825
                                 [disk.ToDict() for disk in ndevs]])
826

    
827
  def call_blockdev_getmirrorstatus(self, node, disks):
828
    """Request status of a (mirroring) device.
829

830
    This is a single-node call.
831

832
    """
833
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
834
                                  [dsk.ToDict() for dsk in disks])
835
    if not result.fail_msg:
836
      result.payload = [objects.BlockDevStatus.FromDict(i)
837
                        for i in result.payload]
838
    return result
839

    
840
  def call_blockdev_find(self, node, disk):
841
    """Request identification of a given block device.
842

843
    This is a single-node call.
844

845
    """
846
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
847
    if not result.fail_msg and result.payload is not None:
848
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
849
    return result
850

    
851
  def call_blockdev_close(self, node, instance_name, disks):
852
    """Closes the given block devices.
853

854
    This is a single-node call.
855

856
    """
857
    params = [instance_name, [cf.ToDict() for cf in disks]]
858
    return self._SingleNodeCall(node, "blockdev_close", params)
859

    
860
  def call_blockdev_getsizes(self, node, disks):
861
    """Returns the size of the given disks.
862

863
    This is a single-node call.
864

865
    """
866
    params = [[cf.ToDict() for cf in disks]]
867
    return self._SingleNodeCall(node, "blockdev_getsize", params)
868

    
869
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
870
    """Disconnects the network of the given drbd devices.
871

872
    This is a multi-node call.
873

874
    """
875
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
876
                               [nodes_ip, [cf.ToDict() for cf in disks]])
877

    
878
  def call_drbd_attach_net(self, node_list, nodes_ip,
879
                           disks, instance_name, multimaster):
880
    """Disconnects the given drbd devices.
881

882
    This is a multi-node call.
883

884
    """
885
    return self._MultiNodeCall(node_list, "drbd_attach_net",
886
                               [nodes_ip, [cf.ToDict() for cf in disks],
887
                                instance_name, multimaster])
888

    
889
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
890
    """Waits for the synchronization of drbd devices is complete.
891

892
    This is a multi-node call.
893

894
    """
895
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
896
                               [nodes_ip, [cf.ToDict() for cf in disks]])
897

    
898
  @classmethod
899
  def call_upload_file(cls, node_list, file_name, address_list=None):
900
    """Upload a file.
901

902
    The node will refuse the operation in case the file is not on the
903
    approved file list.
904

905
    This is a multi-node call.
906

907
    @type node_list: list
908
    @param node_list: the list of node names to upload to
909
    @type file_name: str
910
    @param file_name: the filename to upload
911
    @type address_list: list or None
912
    @keyword address_list: an optional list of node addresses, in order
913
        to optimize the RPC speed
914

915
    """
916
    file_contents = utils.ReadFile(file_name)
917
    data = cls._Compress(file_contents)
918
    st = os.stat(file_name)
919
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
920
              st.st_atime, st.st_mtime]
921
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
922
                                    address_list=address_list)
923

    
924
  @classmethod
925
  def call_write_ssconf_files(cls, node_list, values):
926
    """Write ssconf files.
927

928
    This is a multi-node call.
929

930
    """
931
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
932

    
933
  def call_os_diagnose(self, node_list):
934
    """Request a diagnose of OS definitions.
935

936
    This is a multi-node call.
937

938
    """
939
    return self._MultiNodeCall(node_list, "os_diagnose", [])
940

    
941
  def call_os_get(self, node, name):
942
    """Returns an OS definition.
943

944
    This is a single-node call.
945

946
    """
947
    result = self._SingleNodeCall(node, "os_get", [name])
948
    if not result.fail_msg and isinstance(result.payload, dict):
949
      result.payload = objects.OS.FromDict(result.payload)
950
    return result
951

    
952
  def call_hooks_runner(self, node_list, hpath, phase, env):
953
    """Call the hooks runner.
954

955
    Args:
956
      - op: the OpCode instance
957
      - env: a dictionary with the environment
958

959
    This is a multi-node call.
960

961
    """
962
    params = [hpath, phase, env]
963
    return self._MultiNodeCall(node_list, "hooks_runner", params)
964

    
965
  def call_iallocator_runner(self, node, name, idata):
966
    """Call an iallocator on a remote node
967

968
    Args:
969
      - name: the iallocator name
970
      - input: the json-encoded input string
971

972
    This is a single-node call.
973

974
    """
975
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
976

    
977
  def call_blockdev_grow(self, node, cf_bdev, amount):
978
    """Request a snapshot of the given block device.
979

980
    This is a single-node call.
981

982
    """
983
    return self._SingleNodeCall(node, "blockdev_grow",
984
                                [cf_bdev.ToDict(), amount])
985

    
986
  def call_blockdev_export(self, node, cf_bdev,
987
                           dest_node, dest_path, cluster_name):
988
    """Export a given disk to another node.
989

990
    This is a single-node call.
991

992
    """
993
    return self._SingleNodeCall(node, "blockdev_export",
994
                                [cf_bdev.ToDict(), dest_node, dest_path,
995
                                 cluster_name])
996

    
997
  def call_blockdev_snapshot(self, node, cf_bdev):
998
    """Request a snapshot of the given block device.
999

1000
    This is a single-node call.
1001

1002
    """
1003
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1004

    
1005
  def call_finalize_export(self, node, instance, snap_disks):
1006
    """Request the completion of an export operation.
1007

1008
    This writes the export config file, etc.
1009

1010
    This is a single-node call.
1011

1012
    """
1013
    flat_disks = []
1014
    for disk in snap_disks:
1015
      if isinstance(disk, bool):
1016
        flat_disks.append(disk)
1017
      else:
1018
        flat_disks.append(disk.ToDict())
1019

    
1020
    return self._SingleNodeCall(node, "finalize_export",
1021
                                [self._InstDict(instance), flat_disks])
1022

    
1023
  def call_export_info(self, node, path):
1024
    """Queries the export information in a given path.
1025

1026
    This is a single-node call.
1027

1028
    """
1029
    return self._SingleNodeCall(node, "export_info", [path])
1030

    
1031
  def call_export_list(self, node_list):
1032
    """Gets the stored exports list.
1033

1034
    This is a multi-node call.
1035

1036
    """
1037
    return self._MultiNodeCall(node_list, "export_list", [])
1038

    
1039
  def call_export_remove(self, node, export):
1040
    """Requests removal of a given export.
1041

1042
    This is a single-node call.
1043

1044
    """
1045
    return self._SingleNodeCall(node, "export_remove", [export])
1046

    
1047
  @classmethod
1048
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1049
    """Requests a node to clean the cluster information it has.
1050

1051
    This will remove the configuration information from the ganeti data
1052
    dir.
1053

1054
    This is a single-node call.
1055

1056
    """
1057
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1058
                                     [modify_ssh_setup])
1059

    
1060
  def call_node_volumes(self, node_list):
1061
    """Gets all volumes on node(s).
1062

1063
    This is a multi-node call.
1064

1065
    """
1066
    return self._MultiNodeCall(node_list, "node_volumes", [])
1067

    
1068
  def call_node_demote_from_mc(self, node):
1069
    """Demote a node from the master candidate role.
1070

1071
    This is a single-node call.
1072

1073
    """
1074
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1075

    
1076
  def call_node_powercycle(self, node, hypervisor):
1077
    """Tries to powercycle a node.
1078

1079
    This is a single-node call.
1080

1081
    """
1082
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1083

    
1084
  def call_test_delay(self, node_list, duration):
1085
    """Sleep for a fixed time on given node(s).
1086

1087
    This is a multi-node call.
1088

1089
    """
1090
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1091

    
1092
  def call_file_storage_dir_create(self, node, file_storage_dir):
1093
    """Create the given file storage directory.
1094

1095
    This is a single-node call.
1096

1097
    """
1098
    return self._SingleNodeCall(node, "file_storage_dir_create",
1099
                                [file_storage_dir])
1100

    
1101
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1102
    """Remove the given file storage directory.
1103

1104
    This is a single-node call.
1105

1106
    """
1107
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1108
                                [file_storage_dir])
1109

    
1110
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1111
                                   new_file_storage_dir):
1112
    """Rename file storage directory.
1113

1114
    This is a single-node call.
1115

1116
    """
1117
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1118
                                [old_file_storage_dir, new_file_storage_dir])
1119

    
1120
  @classmethod
1121
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1122
    """Update job queue.
1123

1124
    This is a multi-node call.
1125

1126
    """
1127
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1128
                                    [file_name, cls._Compress(content)],
1129
                                    address_list=address_list)
1130

    
1131
  @classmethod
1132
  def call_jobqueue_purge(cls, node):
1133
    """Purge job queue.
1134

1135
    This is a single-node call.
1136

1137
    """
1138
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1139

    
1140
  @classmethod
1141
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1142
    """Rename a job queue file.
1143

1144
    This is a multi-node call.
1145

1146
    """
1147
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1148
                                    address_list=address_list)
1149

    
1150
  @classmethod
1151
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1152
    """Set the drain flag on the queue.
1153

1154
    This is a multi-node call.
1155

1156
    @type node_list: list
1157
    @param node_list: the list of nodes to query
1158
    @type drain_flag: bool
1159
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1160

1161
    """
1162
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1163
                                    [drain_flag])
1164

    
1165
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1166
    """Validate the hypervisor params.
1167

1168
    This is a multi-node call.
1169

1170
    @type node_list: list
1171
    @param node_list: the list of nodes to query
1172
    @type hvname: string
1173
    @param hvname: the hypervisor name
1174
    @type hvparams: dict
1175
    @param hvparams: the hypervisor parameters to be validated
1176

1177
    """
1178
    cluster = self._cfg.GetClusterInfo()
1179
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1180
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1181
                               [hvname, hv_full])
1182

    
1183
  def call_x509_cert_create(self, node, validity):
1184
    """Creates a new X509 certificate for SSL/TLS.
1185

1186
    This is a single-node call.
1187

1188
    @type validity: int
1189
    @param validity: Validity in seconds
1190

1191
    """
1192
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1193

    
1194
  def call_x509_cert_remove(self, node, name):
1195
    """Removes a X509 certificate.
1196

1197
    This is a single-node call.
1198

1199
    @type name: string
1200
    @param name: Certificate name
1201

1202
    """
1203
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1204

    
1205
  def call_import_start(self, node, opts, instance, dest, dest_args):
1206
    """Starts a listener for an import.
1207

1208
    This is a single-node call.
1209

1210
    @type node: string
1211
    @param node: Node name
1212
    @type instance: C{objects.Instance}
1213
    @param instance: Instance object
1214

1215
    """
1216
    return self._SingleNodeCall(node, "import_start",
1217
                                [opts.ToDict(),
1218
                                 self._InstDict(instance), dest,
1219
                                 _EncodeImportExportIO(dest, dest_args)])
1220

    
1221
  def call_export_start(self, node, opts, host, port,
1222
                        instance, source, source_args):
1223
    """Starts an export daemon.
1224

1225
    This is a single-node call.
1226

1227
    @type node: string
1228
    @param node: Node name
1229
    @type instance: C{objects.Instance}
1230
    @param instance: Instance object
1231

1232
    """
1233
    return self._SingleNodeCall(node, "export_start",
1234
                                [opts.ToDict(), host, port,
1235
                                 self._InstDict(instance), source,
1236
                                 _EncodeImportExportIO(source, source_args)])
1237

    
1238
  def call_impexp_status(self, node, names):
1239
    """Gets the status of an import or export.
1240

1241
    This is a single-node call.
1242

1243
    @type node: string
1244
    @param node: Node name
1245
    @type names: List of strings
1246
    @param names: Import/export names
1247
    @rtype: List of L{objects.ImportExportStatus} instances
1248
    @return: Returns a list of the state of each named import/export or None if
1249
             a status couldn't be retrieved
1250

1251
    """
1252
    result = self._SingleNodeCall(node, "impexp_status", [names])
1253

    
1254
    if not result.fail_msg:
1255
      decoded = []
1256

    
1257
      for i in result.payload:
1258
        if i is None:
1259
          decoded.append(None)
1260
          continue
1261
        decoded.append(objects.ImportExportStatus.FromDict(i))
1262

    
1263
      result.payload = decoded
1264

    
1265
    return result
1266

    
1267
  def call_impexp_abort(self, node, name):
1268
    """Aborts an import or export.
1269

1270
    This is a single-node call.
1271

1272
    @type node: string
1273
    @param node: Node name
1274
    @type name: string
1275
    @param name: Import/export name
1276

1277
    """
1278
    return self._SingleNodeCall(node, "impexp_abort", [name])
1279

    
1280
  def call_impexp_cleanup(self, node, name):
1281
    """Cleans up after an import or export.
1282

1283
    This is a single-node call.
1284

1285
    @type node: string
1286
    @param node: Node name
1287
    @type name: string
1288
    @param name: Import/export name
1289

1290
    """
1291
    return self._SingleNodeCall(node, "impexp_cleanup", [name])