Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 37549316

History | View | Annotate | Download (38.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
# pylint has a bug here, doesn't see this import
46
import ganeti.http.client  # pylint: disable-msg=W0611
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager # pylint: disable-msg=W0603
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  http.InitSsl()
64

    
65
  _http_manager = http.client.HttpClientManager()
66

    
67

    
68
def Shutdown():
69
  """Stops the module-global HTTP client manager.
70

71
  Must be called before quitting the program.
72

73
  """
74
  global _http_manager # pylint: disable-msg=W0603
75

    
76
  if _http_manager:
77
    _http_manager.Shutdown()
78
    _http_manager = None
79

    
80

    
81
class RpcResult(object):
82
  """RPC Result class.
83

84
  This class holds an RPC result. It is needed since in multi-node
85
  calls we can't raise an exception just because one one out of many
86
  failed, and therefore we use this class to encapsulate the result.
87

88
  @ivar data: the data payload, for successful results, or None
89
  @ivar call: the name of the RPC call
90
  @ivar node: the name of the node to which we made the call
91
  @ivar offline: whether the operation failed because the node was
92
      offline, as opposed to actual failure; offline=True will always
93
      imply failed=True, in order to allow simpler checking if
94
      the user doesn't care about the exact failure mode
95
  @ivar fail_msg: the error message if the call failed
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.offline = offline
101
    self.call = call
102
    self.node = node
103

    
104
    if offline:
105
      self.fail_msg = "Node is marked offline"
106
      self.data = self.payload = None
107
    elif failed:
108
      self.fail_msg = self._EnsureErr(data)
109
      self.data = self.payload = None
110
    else:
111
      self.data = data
112
      if not isinstance(self.data, (tuple, list)):
113
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
114
                         type(self.data))
115
        self.payload = None
116
      elif len(data) != 2:
117
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
118
                         "expected 2" % len(self.data))
119
        self.payload = None
120
      elif not self.data[0]:
121
        self.fail_msg = self._EnsureErr(self.data[1])
122
        self.payload = None
123
      else:
124
        # finally success
125
        self.fail_msg = None
126
        self.payload = data[1]
127

    
128
    assert hasattr(self, "call")
129
    assert hasattr(self, "data")
130
    assert hasattr(self, "fail_msg")
131
    assert hasattr(self, "node")
132
    assert hasattr(self, "offline")
133
    assert hasattr(self, "payload")
134

    
135
  @staticmethod
136
  def _EnsureErr(val):
137
    """Helper to ensure we return a 'True' value for error."""
138
    if val:
139
      return val
140
    else:
141
      return "No error information"
142

    
143
  def Raise(self, msg, prereq=False, ecode=None):
144
    """If the result has failed, raise an OpExecError.
145

146
    This is used so that LU code doesn't have to check for each
147
    result, but instead can call this function.
148

149
    """
150
    if not self.fail_msg:
151
      return
152

    
153
    if not msg: # one could pass None for default message
154
      msg = ("Call '%s' to node '%s' has failed: %s" %
155
             (self.call, self.node, self.fail_msg))
156
    else:
157
      msg = "%s: %s" % (msg, self.fail_msg)
158
    if prereq:
159
      ec = errors.OpPrereqError
160
    else:
161
      ec = errors.OpExecError
162
    if ecode is not None:
163
      args = (msg, prereq)
164
    else:
165
      args = (msg, )
166
    raise ec(*args) # pylint: disable-msg=W0142
167

    
168

    
169
class Client:
170
  """RPC Client class.
171

172
  This class, given a (remote) method name, a list of parameters and a
173
  list of nodes, will contact (in parallel) all nodes, and return a
174
  dict of results (key: node name, value: result).
175

176
  One current bug is that generic failure is still signaled by
177
  'False' result, which is not good. This overloading of values can
178
  cause bugs.
179

180
  """
181
  def __init__(self, procedure, body, port):
182
    self.procedure = procedure
183
    self.body = body
184
    self.port = port
185
    self.nc = {}
186

    
187
    self._ssl_params = \
188
      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189
                         ssl_cert_path=constants.NODED_CERT_FILE)
190

    
191
  def ConnectList(self, node_list, address_list=None):
192
    """Add a list of nodes to the target nodes.
193

194
    @type node_list: list
195
    @param node_list: the list of node names to connect
196
    @type address_list: list or None
197
    @keyword address_list: either None or a list with node addresses,
198
        which must have the same length as the node list
199

200
    """
201
    if address_list is None:
202
      address_list = [None for _ in node_list]
203
    else:
204
      assert len(node_list) == len(address_list), \
205
             "Name and address lists should have the same length"
206
    for node, address in zip(node_list, address_list):
207
      self.ConnectNode(node, address)
208

    
209
  def ConnectNode(self, name, address=None):
210
    """Add a node to the target list.
211

212
    @type name: str
213
    @param name: the node name
214
    @type address: str
215
    @keyword address: the node address, if known
216

217
    """
218
    if address is None:
219
      address = name
220

    
221
    self.nc[name] = \
222
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
223
                                    "/%s" % self.procedure,
224
                                    post_data=self.body,
225
                                    ssl_params=self._ssl_params,
226
                                    ssl_verify_peer=True)
227

    
228
  def GetResults(self):
229
    """Call nodes and return results.
230

231
    @rtype: list
232
    @return: List of RPC results
233

234
    """
235
    assert _http_manager, "RPC module not initialized"
236

    
237
    _http_manager.ExecRequests(self.nc.values())
238

    
239
    results = {}
240

    
241
    for name, req in self.nc.iteritems():
242
      if req.success and req.resp_status_code == http.HTTP_OK:
243
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
244
                                  node=name, call=self.procedure)
245
        continue
246

    
247
      # TODO: Better error reporting
248
      if req.error:
249
        msg = req.error
250
      else:
251
        msg = req.resp_body
252

    
253
      logging.error("RPC error in %s from node %s: %s",
254
                    self.procedure, name, msg)
255
      results[name] = RpcResult(data=msg, failed=True, node=name,
256
                                call=self.procedure)
257

    
258
    return results
259

    
260

    
261
def _EncodeImportExportIO(ieio, ieioargs):
262
  """Encodes import/export I/O information.
263

264
  """
265
  if ieio == constants.IEIO_RAW_DISK:
266
    assert len(ieioargs) == 1
267
    return (ieioargs[0].ToDict(), )
268

    
269
  if ieio == constants.IEIO_SCRIPT:
270
    assert len(ieioargs) == 2
271
    return (ieioargs[0].ToDict(), ieioargs[1])
272

    
273
  return ieioargs
274

    
275

    
276
class RpcRunner(object):
277
  """RPC runner class"""
278

    
279
  def __init__(self, cfg):
280
    """Initialized the rpc runner.
281

282
    @type cfg:  C{config.ConfigWriter}
283
    @param cfg: the configuration object that will be used to get data
284
                about the cluster
285

286
    """
287
    self._cfg = cfg
288
    self.port = utils.GetDaemonPort(constants.NODED)
289

    
290
  def _InstDict(self, instance, hvp=None, bep=None):
291
    """Convert the given instance to a dict.
292

293
    This is done via the instance's ToDict() method and additionally
294
    we fill the hvparams with the cluster defaults.
295

296
    @type instance: L{objects.Instance}
297
    @param instance: an Instance object
298
    @type hvp: dict or None
299
    @param hvp: a dictionary with overridden hypervisor parameters
300
    @type bep: dict or None
301
    @param bep: a dictionary with overridden backend parameters
302
    @rtype: dict
303
    @return: the instance dict, with the hvparams filled with the
304
        cluster defaults
305

306
    """
307
    idict = instance.ToDict()
308
    cluster = self._cfg.GetClusterInfo()
309
    idict["hvparams"] = cluster.FillHV(instance)
310
    if hvp is not None:
311
      idict["hvparams"].update(hvp)
312
    idict["beparams"] = cluster.FillBE(instance)
313
    if bep is not None:
314
      idict["beparams"].update(bep)
315
    for nic in idict["nics"]:
316
      nic['nicparams'] = objects.FillDict(
317
        cluster.nicparams[constants.PP_DEFAULT],
318
        nic['nicparams'])
319
    return idict
320

    
321
  def _ConnectList(self, client, node_list, call):
322
    """Helper for computing node addresses.
323

324
    @type client: L{ganeti.rpc.Client}
325
    @param client: a C{Client} instance
326
    @type node_list: list
327
    @param node_list: the node list we should connect
328
    @type call: string
329
    @param call: the name of the remote procedure call, for filling in
330
        correctly any eventual offline nodes' results
331

332
    """
333
    all_nodes = self._cfg.GetAllNodesInfo()
334
    name_list = []
335
    addr_list = []
336
    skip_dict = {}
337
    for node in node_list:
338
      if node in all_nodes:
339
        if all_nodes[node].offline:
340
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
341
          continue
342
        val = all_nodes[node].primary_ip
343
      else:
344
        val = None
345
      addr_list.append(val)
346
      name_list.append(node)
347
    if name_list:
348
      client.ConnectList(name_list, address_list=addr_list)
349
    return skip_dict
350

    
351
  def _ConnectNode(self, client, node, call):
352
    """Helper for computing one node's address.
353

354
    @type client: L{ganeti.rpc.Client}
355
    @param client: a C{Client} instance
356
    @type node: str
357
    @param node: the node we should connect
358
    @type call: string
359
    @param call: the name of the remote procedure call, for filling in
360
        correctly any eventual offline nodes' results
361

362
    """
363
    node_info = self._cfg.GetNodeInfo(node)
364
    if node_info is not None:
365
      if node_info.offline:
366
        return RpcResult(node=node, offline=True, call=call)
367
      addr = node_info.primary_ip
368
    else:
369
      addr = None
370
    client.ConnectNode(node, address=addr)
371

    
372
  def _MultiNodeCall(self, node_list, procedure, args):
373
    """Helper for making a multi-node call
374

375
    """
376
    body = serializer.DumpJson(args, indent=False)
377
    c = Client(procedure, body, self.port)
378
    skip_dict = self._ConnectList(c, node_list, procedure)
379
    skip_dict.update(c.GetResults())
380
    return skip_dict
381

    
382
  @classmethod
383
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
384
                           address_list=None):
385
    """Helper for making a multi-node static call
386

387
    """
388
    body = serializer.DumpJson(args, indent=False)
389
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
390
    c.ConnectList(node_list, address_list=address_list)
391
    return c.GetResults()
392

    
393
  def _SingleNodeCall(self, node, procedure, args):
394
    """Helper for making a single-node call
395

396
    """
397
    body = serializer.DumpJson(args, indent=False)
398
    c = Client(procedure, body, self.port)
399
    result = self._ConnectNode(c, node, procedure)
400
    if result is None:
401
      # we did connect, node is not offline
402
      result = c.GetResults()[node]
403
    return result
404

    
405
  @classmethod
406
  def _StaticSingleNodeCall(cls, node, procedure, args):
407
    """Helper for making a single-node static call
408

409
    """
410
    body = serializer.DumpJson(args, indent=False)
411
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
412
    c.ConnectNode(node)
413
    return c.GetResults()[node]
414

    
415
  @staticmethod
416
  def _Compress(data):
417
    """Compresses a string for transport over RPC.
418

419
    Small amounts of data are not compressed.
420

421
    @type data: str
422
    @param data: Data
423
    @rtype: tuple
424
    @return: Encoded data to send
425

426
    """
427
    # Small amounts of data are not compressed
428
    if len(data) < 512:
429
      return (constants.RPC_ENCODING_NONE, data)
430

    
431
    # Compress with zlib and encode in base64
432
    return (constants.RPC_ENCODING_ZLIB_BASE64,
433
            base64.b64encode(zlib.compress(data, 3)))
434

    
435
  #
436
  # Begin RPC calls
437
  #
438

    
439
  def call_lv_list(self, node_list, vg_name):
440
    """Gets the logical volumes present in a given volume group.
441

442
    This is a multi-node call.
443

444
    """
445
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
446

    
447
  def call_vg_list(self, node_list):
448
    """Gets the volume group list.
449

450
    This is a multi-node call.
451

452
    """
453
    return self._MultiNodeCall(node_list, "vg_list", [])
454

    
455
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
456
    """Get list of storage units.
457

458
    This is a multi-node call.
459

460
    """
461
    return self._MultiNodeCall(node_list, "storage_list",
462
                               [su_name, su_args, name, fields])
463

    
464
  def call_storage_modify(self, node, su_name, su_args, name, changes):
465
    """Modify a storage unit.
466

467
    This is a single-node call.
468

469
    """
470
    return self._SingleNodeCall(node, "storage_modify",
471
                                [su_name, su_args, name, changes])
472

    
473
  def call_storage_execute(self, node, su_name, su_args, name, op):
474
    """Executes an operation on a storage unit.
475

476
    This is a single-node call.
477

478
    """
479
    return self._SingleNodeCall(node, "storage_execute",
480
                                [su_name, su_args, name, op])
481

    
482
  def call_bridges_exist(self, node, bridges_list):
483
    """Checks if a node has all the bridges given.
484

485
    This method checks if all bridges given in the bridges_list are
486
    present on the remote node, so that an instance that uses interfaces
487
    on those bridges can be started.
488

489
    This is a single-node call.
490

491
    """
492
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
493

    
494
  def call_instance_start(self, node, instance, hvp, bep):
495
    """Starts an instance.
496

497
    This is a single-node call.
498

499
    """
500
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
501
    return self._SingleNodeCall(node, "instance_start", [idict])
502

    
503
  def call_instance_shutdown(self, node, instance, timeout):
504
    """Stops an instance.
505

506
    This is a single-node call.
507

508
    """
509
    return self._SingleNodeCall(node, "instance_shutdown",
510
                                [self._InstDict(instance), timeout])
511

    
512
  def call_migration_info(self, node, instance):
513
    """Gather the information necessary to prepare an instance migration.
514

515
    This is a single-node call.
516

517
    @type node: string
518
    @param node: the node on which the instance is currently running
519
    @type instance: C{objects.Instance}
520
    @param instance: the instance definition
521

522
    """
523
    return self._SingleNodeCall(node, "migration_info",
524
                                [self._InstDict(instance)])
525

    
526
  def call_accept_instance(self, node, instance, info, target):
527
    """Prepare a node to accept an instance.
528

529
    This is a single-node call.
530

531
    @type node: string
532
    @param node: the target node for the migration
533
    @type instance: C{objects.Instance}
534
    @param instance: the instance definition
535
    @type info: opaque/hypervisor specific (string/data)
536
    @param info: result for the call_migration_info call
537
    @type target: string
538
    @param target: target hostname (usually ip address) (on the node itself)
539

540
    """
541
    return self._SingleNodeCall(node, "accept_instance",
542
                                [self._InstDict(instance), info, target])
543

    
544
  def call_finalize_migration(self, node, instance, info, success):
545
    """Finalize any target-node migration specific operation.
546

547
    This is called both in case of a successful migration and in case of error
548
    (in which case it should abort the migration).
549

550
    This is a single-node call.
551

552
    @type node: string
553
    @param node: the target node for the migration
554
    @type instance: C{objects.Instance}
555
    @param instance: the instance definition
556
    @type info: opaque/hypervisor specific (string/data)
557
    @param info: result for the call_migration_info call
558
    @type success: boolean
559
    @param success: whether the migration was a success or a failure
560

561
    """
562
    return self._SingleNodeCall(node, "finalize_migration",
563
                                [self._InstDict(instance), info, success])
564

    
565
  def call_instance_migrate(self, node, instance, target, live):
566
    """Migrate an instance.
567

568
    This is a single-node call.
569

570
    @type node: string
571
    @param node: the node on which the instance is currently running
572
    @type instance: C{objects.Instance}
573
    @param instance: the instance definition
574
    @type target: string
575
    @param target: the target node name
576
    @type live: boolean
577
    @param live: whether the migration should be done live or not (the
578
        interpretation of this parameter is left to the hypervisor)
579

580
    """
581
    return self._SingleNodeCall(node, "instance_migrate",
582
                                [self._InstDict(instance), target, live])
583

    
584
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
585
    """Reboots an instance.
586

587
    This is a single-node call.
588

589
    """
590
    return self._SingleNodeCall(node, "instance_reboot",
591
                                [self._InstDict(inst), reboot_type,
592
                                 shutdown_timeout])
593

    
594
  def call_instance_os_add(self, node, inst, reinstall, debug):
595
    """Installs an OS on the given instance.
596

597
    This is a single-node call.
598

599
    """
600
    return self._SingleNodeCall(node, "instance_os_add",
601
                                [self._InstDict(inst), reinstall, debug])
602

    
603
  def call_instance_run_rename(self, node, inst, old_name, debug):
604
    """Run the OS rename script for an instance.
605

606
    This is a single-node call.
607

608
    """
609
    return self._SingleNodeCall(node, "instance_run_rename",
610
                                [self._InstDict(inst), old_name, debug])
611

    
612
  def call_instance_info(self, node, instance, hname):
613
    """Returns information about a single instance.
614

615
    This is a single-node call.
616

617
    @type node: list
618
    @param node: the list of nodes to query
619
    @type instance: string
620
    @param instance: the instance name
621
    @type hname: string
622
    @param hname: the hypervisor type of the instance
623

624
    """
625
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
626

    
627
  def call_instance_migratable(self, node, instance):
628
    """Checks whether the given instance can be migrated.
629

630
    This is a single-node call.
631

632
    @param node: the node to query
633
    @type instance: L{objects.Instance}
634
    @param instance: the instance to check
635

636

637
    """
638
    return self._SingleNodeCall(node, "instance_migratable",
639
                                [self._InstDict(instance)])
640

    
641
  def call_all_instances_info(self, node_list, hypervisor_list):
642
    """Returns information about all instances on the given nodes.
643

644
    This is a multi-node call.
645

646
    @type node_list: list
647
    @param node_list: the list of nodes to query
648
    @type hypervisor_list: list
649
    @param hypervisor_list: the hypervisors to query for instances
650

651
    """
652
    return self._MultiNodeCall(node_list, "all_instances_info",
653
                               [hypervisor_list])
654

    
655
  def call_instance_list(self, node_list, hypervisor_list):
656
    """Returns the list of running instances on a given node.
657

658
    This is a multi-node call.
659

660
    @type node_list: list
661
    @param node_list: the list of nodes to query
662
    @type hypervisor_list: list
663
    @param hypervisor_list: the hypervisors to query for instances
664

665
    """
666
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
667

    
668
  def call_node_tcp_ping(self, node, source, target, port, timeout,
669
                         live_port_needed):
670
    """Do a TcpPing on the remote node
671

672
    This is a single-node call.
673

674
    """
675
    return self._SingleNodeCall(node, "node_tcp_ping",
676
                                [source, target, port, timeout,
677
                                 live_port_needed])
678

    
679
  def call_node_has_ip_address(self, node, address):
680
    """Checks if a node has the given IP address.
681

682
    This is a single-node call.
683

684
    """
685
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
686

    
687
  def call_node_info(self, node_list, vg_name, hypervisor_type):
688
    """Return node information.
689

690
    This will return memory information and volume group size and free
691
    space.
692

693
    This is a multi-node call.
694

695
    @type node_list: list
696
    @param node_list: the list of nodes to query
697
    @type vg_name: C{string}
698
    @param vg_name: the name of the volume group to ask for disk space
699
        information
700
    @type hypervisor_type: C{str}
701
    @param hypervisor_type: the name of the hypervisor to ask for
702
        memory information
703

704
    """
705
    return self._MultiNodeCall(node_list, "node_info",
706
                               [vg_name, hypervisor_type])
707

    
708
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
709
    """Add a node to the cluster.
710

711
    This is a single-node call.
712

713
    """
714
    return self._SingleNodeCall(node, "node_add",
715
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
716

    
717
  def call_node_verify(self, node_list, checkdict, cluster_name):
718
    """Request verification of given parameters.
719

720
    This is a multi-node call.
721

722
    """
723
    return self._MultiNodeCall(node_list, "node_verify",
724
                               [checkdict, cluster_name])
725

    
726
  @classmethod
727
  def call_node_start_master(cls, node, start_daemons, no_voting):
728
    """Tells a node to activate itself as a master.
729

730
    This is a single-node call.
731

732
    """
733
    return cls._StaticSingleNodeCall(node, "node_start_master",
734
                                     [start_daemons, no_voting])
735

    
736
  @classmethod
737
  def call_node_stop_master(cls, node, stop_daemons):
738
    """Tells a node to demote itself from master status.
739

740
    This is a single-node call.
741

742
    """
743
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
744

    
745
  @classmethod
746
  def call_master_info(cls, node_list):
747
    """Query master info.
748

749
    This is a multi-node call.
750

751
    """
752
    # TODO: should this method query down nodes?
753
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
754

    
755
  @classmethod
756
  def call_version(cls, node_list):
757
    """Query node version.
758

759
    This is a multi-node call.
760

761
    """
762
    return cls._StaticMultiNodeCall(node_list, "version", [])
763

    
764
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
765
    """Request creation of a given block device.
766

767
    This is a single-node call.
768

769
    """
770
    return self._SingleNodeCall(node, "blockdev_create",
771
                                [bdev.ToDict(), size, owner, on_primary, info])
772

    
773
  def call_blockdev_remove(self, node, bdev):
774
    """Request removal of a given block device.
775

776
    This is a single-node call.
777

778
    """
779
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
780

    
781
  def call_blockdev_rename(self, node, devlist):
782
    """Request rename of the given block devices.
783

784
    This is a single-node call.
785

786
    """
787
    return self._SingleNodeCall(node, "blockdev_rename",
788
                                [(d.ToDict(), uid) for d, uid in devlist])
789

    
790
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
791
    """Request assembling of a given block device.
792

793
    This is a single-node call.
794

795
    """
796
    return self._SingleNodeCall(node, "blockdev_assemble",
797
                                [disk.ToDict(), owner, on_primary])
798

    
799
  def call_blockdev_shutdown(self, node, disk):
800
    """Request shutdown of a given block device.
801

802
    This is a single-node call.
803

804
    """
805
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
806

    
807
  def call_blockdev_addchildren(self, node, bdev, ndevs):
808
    """Request adding a list of children to a (mirroring) device.
809

810
    This is a single-node call.
811

812
    """
813
    return self._SingleNodeCall(node, "blockdev_addchildren",
814
                                [bdev.ToDict(),
815
                                 [disk.ToDict() for disk in ndevs]])
816

    
817
  def call_blockdev_removechildren(self, node, bdev, ndevs):
818
    """Request removing a list of children from a (mirroring) device.
819

820
    This is a single-node call.
821

822
    """
823
    return self._SingleNodeCall(node, "blockdev_removechildren",
824
                                [bdev.ToDict(),
825
                                 [disk.ToDict() for disk in ndevs]])
826

    
827
  def call_blockdev_getmirrorstatus(self, node, disks):
828
    """Request status of a (mirroring) device.
829

830
    This is a single-node call.
831

832
    """
833
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
834
                                  [dsk.ToDict() for dsk in disks])
835
    if not result.fail_msg:
836
      result.payload = [objects.BlockDevStatus.FromDict(i)
837
                        for i in result.payload]
838
    return result
839

    
840
  def call_blockdev_find(self, node, disk):
841
    """Request identification of a given block device.
842

843
    This is a single-node call.
844

845
    """
846
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
847
    if not result.fail_msg and result.payload is not None:
848
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
849
    return result
850

    
851
  def call_blockdev_close(self, node, instance_name, disks):
852
    """Closes the given block devices.
853

854
    This is a single-node call.
855

856
    """
857
    params = [instance_name, [cf.ToDict() for cf in disks]]
858
    return self._SingleNodeCall(node, "blockdev_close", params)
859

    
860
  def call_blockdev_getsizes(self, node, disks):
861
    """Returns the size of the given disks.
862

863
    This is a single-node call.
864

865
    """
866
    params = [[cf.ToDict() for cf in disks]]
867
    return self._SingleNodeCall(node, "blockdev_getsize", params)
868

    
869
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
870
    """Disconnects the network of the given drbd devices.
871

872
    This is a multi-node call.
873

874
    """
875
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
876
                               [nodes_ip, [cf.ToDict() for cf in disks]])
877

    
878
  def call_drbd_attach_net(self, node_list, nodes_ip,
879
                           disks, instance_name, multimaster):
880
    """Disconnects the given drbd devices.
881

882
    This is a multi-node call.
883

884
    """
885
    return self._MultiNodeCall(node_list, "drbd_attach_net",
886
                               [nodes_ip, [cf.ToDict() for cf in disks],
887
                                instance_name, multimaster])
888

    
889
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
890
    """Waits for the synchronization of drbd devices is complete.
891

892
    This is a multi-node call.
893

894
    """
895
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
896
                               [nodes_ip, [cf.ToDict() for cf in disks]])
897

    
898
  @classmethod
899
  def call_upload_file(cls, node_list, file_name, address_list=None):
900
    """Upload a file.
901

902
    The node will refuse the operation in case the file is not on the
903
    approved file list.
904

905
    This is a multi-node call.
906

907
    @type node_list: list
908
    @param node_list: the list of node names to upload to
909
    @type file_name: str
910
    @param file_name: the filename to upload
911
    @type address_list: list or None
912
    @keyword address_list: an optional list of node addresses, in order
913
        to optimize the RPC speed
914

915
    """
916
    file_contents = utils.ReadFile(file_name)
917
    data = cls._Compress(file_contents)
918
    st = os.stat(file_name)
919
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
920
              st.st_atime, st.st_mtime]
921
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
922
                                    address_list=address_list)
923

    
924
  @classmethod
925
  def call_write_ssconf_files(cls, node_list, values):
926
    """Write ssconf files.
927

928
    This is a multi-node call.
929

930
    """
931
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
932

    
933
  def call_os_diagnose(self, node_list):
934
    """Request a diagnose of OS definitions.
935

936
    This is a multi-node call.
937

938
    """
939
    return self._MultiNodeCall(node_list, "os_diagnose", [])
940

    
941
  def call_os_get(self, node, name):
942
    """Returns an OS definition.
943

944
    This is a single-node call.
945

946
    """
947
    result = self._SingleNodeCall(node, "os_get", [name])
948
    if not result.fail_msg and isinstance(result.payload, dict):
949
      result.payload = objects.OS.FromDict(result.payload)
950
    return result
951

    
952
  def call_hooks_runner(self, node_list, hpath, phase, env):
953
    """Call the hooks runner.
954

955
    Args:
956
      - op: the OpCode instance
957
      - env: a dictionary with the environment
958

959
    This is a multi-node call.
960

961
    """
962
    params = [hpath, phase, env]
963
    return self._MultiNodeCall(node_list, "hooks_runner", params)
964

    
965
  def call_iallocator_runner(self, node, name, idata):
966
    """Call an iallocator on a remote node
967

968
    Args:
969
      - name: the iallocator name
970
      - input: the json-encoded input string
971

972
    This is a single-node call.
973

974
    """
975
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
976

    
977
  def call_blockdev_grow(self, node, cf_bdev, amount):
978
    """Request a snapshot of the given block device.
979

980
    This is a single-node call.
981

982
    """
983
    return self._SingleNodeCall(node, "blockdev_grow",
984
                                [cf_bdev.ToDict(), amount])
985

    
986
  def call_blockdev_export(self, node, cf_bdev,
987
                           dest_node, dest_path, cluster_name):
988
    """Export a given disk to another node.
989

990
    This is a single-node call.
991

992
    """
993
    return self._SingleNodeCall(node, "blockdev_export",
994
                                [cf_bdev.ToDict(), dest_node, dest_path,
995
                                 cluster_name])
996

    
997
  def call_blockdev_snapshot(self, node, cf_bdev):
998
    """Request a snapshot of the given block device.
999

1000
    This is a single-node call.
1001

1002
    """
1003
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1004

    
1005
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
1006
                           cluster_name, idx, debug):
1007
    """Request the export of a given snapshot.
1008

1009
    This is a single-node call.
1010

1011
    """
1012
    return self._SingleNodeCall(node, "snapshot_export",
1013
                                [snap_bdev.ToDict(), dest_node,
1014
                                 self._InstDict(instance), cluster_name,
1015
                                 idx, debug])
1016

    
1017
  def call_finalize_export(self, node, instance, snap_disks):
1018
    """Request the completion of an export operation.
1019

1020
    This writes the export config file, etc.
1021

1022
    This is a single-node call.
1023

1024
    """
1025
    flat_disks = []
1026
    for disk in snap_disks:
1027
      if isinstance(disk, bool):
1028
        flat_disks.append(disk)
1029
      else:
1030
        flat_disks.append(disk.ToDict())
1031

    
1032
    return self._SingleNodeCall(node, "finalize_export",
1033
                                [self._InstDict(instance), flat_disks])
1034

    
1035
  def call_export_info(self, node, path):
1036
    """Queries the export information in a given path.
1037

1038
    This is a single-node call.
1039

1040
    """
1041
    return self._SingleNodeCall(node, "export_info", [path])
1042

    
1043
  def call_instance_os_import(self, node, inst, src_node, src_images,
1044
                              cluster_name, debug):
1045
    """Request the import of a backup into an instance.
1046

1047
    This is a single-node call.
1048

1049
    """
1050
    return self._SingleNodeCall(node, "instance_os_import",
1051
                                [self._InstDict(inst), src_node, src_images,
1052
                                 cluster_name, debug])
1053

    
1054
  def call_export_list(self, node_list):
1055
    """Gets the stored exports list.
1056

1057
    This is a multi-node call.
1058

1059
    """
1060
    return self._MultiNodeCall(node_list, "export_list", [])
1061

    
1062
  def call_export_remove(self, node, export):
1063
    """Requests removal of a given export.
1064

1065
    This is a single-node call.
1066

1067
    """
1068
    return self._SingleNodeCall(node, "export_remove", [export])
1069

    
1070
  @classmethod
1071
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1072
    """Requests a node to clean the cluster information it has.
1073

1074
    This will remove the configuration information from the ganeti data
1075
    dir.
1076

1077
    This is a single-node call.
1078

1079
    """
1080
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1081
                                     [modify_ssh_setup])
1082

    
1083
  def call_node_volumes(self, node_list):
1084
    """Gets all volumes on node(s).
1085

1086
    This is a multi-node call.
1087

1088
    """
1089
    return self._MultiNodeCall(node_list, "node_volumes", [])
1090

    
1091
  def call_node_demote_from_mc(self, node):
1092
    """Demote a node from the master candidate role.
1093

1094
    This is a single-node call.
1095

1096
    """
1097
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1098

    
1099
  def call_node_powercycle(self, node, hypervisor):
1100
    """Tries to powercycle a node.
1101

1102
    This is a single-node call.
1103

1104
    """
1105
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1106

    
1107
  def call_test_delay(self, node_list, duration):
1108
    """Sleep for a fixed time on given node(s).
1109

1110
    This is a multi-node call.
1111

1112
    """
1113
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1114

    
1115
  def call_file_storage_dir_create(self, node, file_storage_dir):
1116
    """Create the given file storage directory.
1117

1118
    This is a single-node call.
1119

1120
    """
1121
    return self._SingleNodeCall(node, "file_storage_dir_create",
1122
                                [file_storage_dir])
1123

    
1124
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1125
    """Remove the given file storage directory.
1126

1127
    This is a single-node call.
1128

1129
    """
1130
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1131
                                [file_storage_dir])
1132

    
1133
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1134
                                   new_file_storage_dir):
1135
    """Rename file storage directory.
1136

1137
    This is a single-node call.
1138

1139
    """
1140
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1141
                                [old_file_storage_dir, new_file_storage_dir])
1142

    
1143
  @classmethod
1144
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1145
    """Update job queue.
1146

1147
    This is a multi-node call.
1148

1149
    """
1150
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1151
                                    [file_name, cls._Compress(content)],
1152
                                    address_list=address_list)
1153

    
1154
  @classmethod
1155
  def call_jobqueue_purge(cls, node):
1156
    """Purge job queue.
1157

1158
    This is a single-node call.
1159

1160
    """
1161
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1162

    
1163
  @classmethod
1164
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1165
    """Rename a job queue file.
1166

1167
    This is a multi-node call.
1168

1169
    """
1170
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1171
                                    address_list=address_list)
1172

    
1173
  @classmethod
1174
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1175
    """Set the drain flag on the queue.
1176

1177
    This is a multi-node call.
1178

1179
    @type node_list: list
1180
    @param node_list: the list of nodes to query
1181
    @type drain_flag: bool
1182
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1183

1184
    """
1185
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1186
                                    [drain_flag])
1187

    
1188
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1189
    """Validate the hypervisor params.
1190

1191
    This is a multi-node call.
1192

1193
    @type node_list: list
1194
    @param node_list: the list of nodes to query
1195
    @type hvname: string
1196
    @param hvname: the hypervisor name
1197
    @type hvparams: dict
1198
    @param hvparams: the hypervisor parameters to be validated
1199

1200
    """
1201
    cluster = self._cfg.GetClusterInfo()
1202
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1203
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1204
                               [hvname, hv_full])
1205

    
1206
  def call_x509_cert_create(self, node, validity):
1207
    """Creates a new X509 certificate for SSL/TLS.
1208

1209
    This is a single-node call.
1210

1211
    @type validity: int
1212
    @param validity: Validity in seconds
1213

1214
    """
1215
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1216

    
1217
  def call_x509_cert_remove(self, node, name):
1218
    """Removes a X509 certificate.
1219

1220
    This is a single-node call.
1221

1222
    @type name: string
1223
    @param name: Certificate name
1224

1225
    """
1226
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1227

    
1228
  def call_import_start(self, node, x509_key_name, source_x509_ca,
1229
                        instance, dest, dest_args):
1230
    """Starts a listener for an import.
1231

1232
    This is a single-node call.
1233

1234
    @type node: string
1235
    @param node: Node name
1236
    @type instance: C{objects.Instance}
1237
    @param instance: Instance object
1238

1239
    """
1240
    return self._SingleNodeCall(node, "import_start",
1241
                                [x509_key_name, source_x509_ca,
1242
                                 self._InstDict(instance), dest,
1243
                                 _EncodeImportExportIO(dest, dest_args)])
1244

    
1245
  def call_export_start(self, node, x509_key_name, dest_x509_ca, host, port,
1246
                        instance, source, source_args):
1247
    """Starts an export daemon.
1248

1249
    This is a single-node call.
1250

1251
    @type node: string
1252
    @param node: Node name
1253
    @type instance: C{objects.Instance}
1254
    @param instance: Instance object
1255

1256
    """
1257
    return self._SingleNodeCall(node, "export_start",
1258
                                [x509_key_name, dest_x509_ca, host, port,
1259
                                 self._InstDict(instance), source,
1260
                                 _EncodeImportExportIO(source, source_args)])
1261

    
1262
  def call_impexp_status(self, node, names):
1263
    """Gets the status of an import or export.
1264

1265
    This is a single-node call.
1266

1267
    @type node: string
1268
    @param node: Node name
1269
    @type names: List of strings
1270
    @param names: Import/export names
1271
    @rtype: List of L{objects.ImportExportStatus} instances
1272
    @return: Returns a list of the state of each named import/export or None if
1273
             a status couldn't be retrieved
1274

1275
    """
1276
    result = self._SingleNodeCall(node, "impexp_status", [names])
1277

    
1278
    if not result.fail_msg:
1279
      decoded = []
1280

    
1281
      for i in result.payload:
1282
        if i is None:
1283
          decoded.append(None)
1284
          continue
1285
        decoded.append(objects.ImportExportStatus.FromDict(i))
1286

    
1287
      result.payload = decoded
1288

    
1289
    return result
1290

    
1291
  def call_impexp_abort(self, node, name):
1292
    """Aborts an import or export.
1293

1294
    This is a single-node call.
1295

1296
    @type node: string
1297
    @param node: Node name
1298
    @type name: string
1299
    @param name: Import/export name
1300

1301
    """
1302
    return self._SingleNodeCall(node, "impexp_abort", [name])
1303

    
1304
  def call_impexp_cleanup(self, node, name):
1305
    """Cleans up after an import or export.
1306

1307
    This is a single-node call.
1308

1309
    @type node: string
1310
    @param node: Node name
1311
    @type name: string
1312
    @param name: Import/export name
1313

1314
    """
1315
    return self._SingleNodeCall(node, "impexp_cleanup", [name])