Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 3e06e001

History | View | Annotate | Download (32.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @type failed: boolean
87
  @ivar failed: whether the operation failed at transport level (not
88
      application level on the remote node)
89
  @ivar call: the name of the RPC call
90
  @ivar node: the name of the node to which we made the call
91
  @ivar offline: whether the operation failed because the node was
92
      offline, as opposed to actual failure; offline=True will always
93
      imply failed=True, in order to allow simpler checking if
94
      the user doesn't care about the exact failure mode
95
  @ivar fail_msg: the error message if the call failed
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.fail_msg = "Node is marked offline"
107
      self.data = self.payload = None
108
    elif failed:
109
      self.fail_msg = self._EnsureErr(data)
110
      self.data = self.payload = None
111
    else:
112
      self.data = data
113
      if not isinstance(self.data, (tuple, list)):
114
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
115
                         type(self.data))
116
      elif len(data) != 2:
117
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
118
                         "expected 2" % len(self.data))
119
      elif not self.data[0]:
120
        self.fail_msg = self._EnsureErr(self.data[1])
121
      else:
122
        # finally success
123
        self.fail_msg = None
124
        self.payload = data[1]
125

    
126
  @staticmethod
127
  def _EnsureErr(val):
128
    """Helper to ensure we return a 'True' value for error."""
129
    if val:
130
      return val
131
    else:
132
      return "No error information"
133

    
134
  def Raise(self, msg, prereq=False):
135
    """If the result has failed, raise an OpExecError.
136

137
    This is used so that LU code doesn't have to check for each
138
    result, but instead can call this function.
139

140
    """
141
    if not self.fail_msg:
142
      return
143

    
144
    if not msg: # one could pass None for default message
145
      msg = ("Call '%s' to node '%s' has failed: %s" %
146
             (self.call, self.node, self.fail_msg))
147
    else:
148
      msg = "%s: %s" % (msg, self.fail_msg)
149
    if prereq:
150
      ec = errors.OpPrereqError
151
    else:
152
      ec = errors.OpExecError
153
    raise ec(msg)
154

    
155
  def RemoteFailMsg(self):
156
    """Check if the remote procedure failed.
157

158
    @return: the fail_msg attribute
159

160
    """
161
    return self.fail_msg
162

    
163

    
164
class Client:
165
  """RPC Client class.
166

167
  This class, given a (remote) method name, a list of parameters and a
168
  list of nodes, will contact (in parallel) all nodes, and return a
169
  dict of results (key: node name, value: result).
170

171
  One current bug is that generic failure is still signaled by
172
  'False' result, which is not good. This overloading of values can
173
  cause bugs.
174

175
  """
176
  def __init__(self, procedure, body, port):
177
    self.procedure = procedure
178
    self.body = body
179
    self.port = port
180
    self.nc = {}
181

    
182
    self._ssl_params = \
183
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
184
                         ssl_cert_path=constants.SSL_CERT_FILE)
185

    
186
  def ConnectList(self, node_list, address_list=None):
187
    """Add a list of nodes to the target nodes.
188

189
    @type node_list: list
190
    @param node_list: the list of node names to connect
191
    @type address_list: list or None
192
    @keyword address_list: either None or a list with node addresses,
193
        which must have the same length as the node list
194

195
    """
196
    if address_list is None:
197
      address_list = [None for _ in node_list]
198
    else:
199
      assert len(node_list) == len(address_list), \
200
             "Name and address lists should have the same length"
201
    for node, address in zip(node_list, address_list):
202
      self.ConnectNode(node, address)
203

    
204
  def ConnectNode(self, name, address=None):
205
    """Add a node to the target list.
206

207
    @type name: str
208
    @param name: the node name
209
    @type address: str
210
    @keyword address: the node address, if known
211

212
    """
213
    if address is None:
214
      address = name
215

    
216
    self.nc[name] = \
217
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
218
                                    "/%s" % self.procedure,
219
                                    post_data=self.body,
220
                                    ssl_params=self._ssl_params,
221
                                    ssl_verify_peer=True)
222

    
223
  def GetResults(self):
224
    """Call nodes and return results.
225

226
    @rtype: list
227
    @return: List of RPC results
228

229
    """
230
    assert _http_manager, "RPC module not initialized"
231

    
232
    _http_manager.ExecRequests(self.nc.values())
233

    
234
    results = {}
235

    
236
    for name, req in self.nc.iteritems():
237
      if req.success and req.resp_status_code == http.HTTP_OK:
238
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
239
                                  node=name, call=self.procedure)
240
        continue
241

    
242
      # TODO: Better error reporting
243
      if req.error:
244
        msg = req.error
245
      else:
246
        msg = req.resp_body
247

    
248
      logging.error("RPC error in %s from node %s: %s",
249
                    self.procedure, name, msg)
250
      results[name] = RpcResult(data=msg, failed=True, node=name,
251
                                call=self.procedure)
252

    
253
    return results
254

    
255

    
256
class RpcRunner(object):
257
  """RPC runner class"""
258

    
259
  def __init__(self, cfg):
260
    """Initialized the rpc runner.
261

262
    @type cfg:  C{config.ConfigWriter}
263
    @param cfg: the configuration object that will be used to get data
264
                about the cluster
265

266
    """
267
    self._cfg = cfg
268
    self.port = utils.GetNodeDaemonPort()
269

    
270
  def _InstDict(self, instance, hvp=None, bep=None):
271
    """Convert the given instance to a dict.
272

273
    This is done via the instance's ToDict() method and additionally
274
    we fill the hvparams with the cluster defaults.
275

276
    @type instance: L{objects.Instance}
277
    @param instance: an Instance object
278
    @type hvp: dict or None
279
    @param hvp: a dictionary with overridden hypervisor parameters
280
    @type bep: dict or None
281
    @param bep: a dictionary with overridden backend parameters
282
    @rtype: dict
283
    @return: the instance dict, with the hvparams filled with the
284
        cluster defaults
285

286
    """
287
    idict = instance.ToDict()
288
    cluster = self._cfg.GetClusterInfo()
289
    idict["hvparams"] = cluster.FillHV(instance)
290
    if hvp is not None:
291
      idict["hvparams"].update(hvp)
292
    idict["beparams"] = cluster.FillBE(instance)
293
    if bep is not None:
294
      idict["beparams"].update(bep)
295
    for nic in idict["nics"]:
296
      nic['nicparams'] = objects.FillDict(
297
        cluster.nicparams[constants.PP_DEFAULT],
298
        nic['nicparams'])
299
    return idict
300

    
301
  def _ConnectList(self, client, node_list, call):
302
    """Helper for computing node addresses.
303

304
    @type client: L{ganeti.rpc.Client}
305
    @param client: a C{Client} instance
306
    @type node_list: list
307
    @param node_list: the node list we should connect
308
    @type call: string
309
    @param call: the name of the remote procedure call, for filling in
310
        correctly any eventual offline nodes' results
311

312
    """
313
    all_nodes = self._cfg.GetAllNodesInfo()
314
    name_list = []
315
    addr_list = []
316
    skip_dict = {}
317
    for node in node_list:
318
      if node in all_nodes:
319
        if all_nodes[node].offline:
320
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
321
          continue
322
        val = all_nodes[node].primary_ip
323
      else:
324
        val = None
325
      addr_list.append(val)
326
      name_list.append(node)
327
    if name_list:
328
      client.ConnectList(name_list, address_list=addr_list)
329
    return skip_dict
330

    
331
  def _ConnectNode(self, client, node, call):
332
    """Helper for computing one node's address.
333

334
    @type client: L{ganeti.rpc.Client}
335
    @param client: a C{Client} instance
336
    @type node: str
337
    @param node: the node we should connect
338
    @type call: string
339
    @param call: the name of the remote procedure call, for filling in
340
        correctly any eventual offline nodes' results
341

342
    """
343
    node_info = self._cfg.GetNodeInfo(node)
344
    if node_info is not None:
345
      if node_info.offline:
346
        return RpcResult(node=node, offline=True, call=call)
347
      addr = node_info.primary_ip
348
    else:
349
      addr = None
350
    client.ConnectNode(node, address=addr)
351

    
352
  def _MultiNodeCall(self, node_list, procedure, args):
353
    """Helper for making a multi-node call
354

355
    """
356
    body = serializer.DumpJson(args, indent=False)
357
    c = Client(procedure, body, self.port)
358
    skip_dict = self._ConnectList(c, node_list, procedure)
359
    skip_dict.update(c.GetResults())
360
    return skip_dict
361

    
362
  @classmethod
363
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
364
                           address_list=None):
365
    """Helper for making a multi-node static call
366

367
    """
368
    body = serializer.DumpJson(args, indent=False)
369
    c = Client(procedure, body, utils.GetNodeDaemonPort())
370
    c.ConnectList(node_list, address_list=address_list)
371
    return c.GetResults()
372

    
373
  def _SingleNodeCall(self, node, procedure, args):
374
    """Helper for making a single-node call
375

376
    """
377
    body = serializer.DumpJson(args, indent=False)
378
    c = Client(procedure, body, self.port)
379
    result = self._ConnectNode(c, node, procedure)
380
    if result is None:
381
      # we did connect, node is not offline
382
      result = c.GetResults()[node]
383
    return result
384

    
385
  @classmethod
386
  def _StaticSingleNodeCall(cls, node, procedure, args):
387
    """Helper for making a single-node static call
388

389
    """
390
    body = serializer.DumpJson(args, indent=False)
391
    c = Client(procedure, body, utils.GetNodeDaemonPort())
392
    c.ConnectNode(node)
393
    return c.GetResults()[node]
394

    
395
  @staticmethod
396
  def _Compress(data):
397
    """Compresses a string for transport over RPC.
398

399
    Small amounts of data are not compressed.
400

401
    @type data: str
402
    @param data: Data
403
    @rtype: tuple
404
    @return: Encoded data to send
405

406
    """
407
    # Small amounts of data are not compressed
408
    if len(data) < 512:
409
      return (constants.RPC_ENCODING_NONE, data)
410

    
411
    # Compress with zlib and encode in base64
412
    return (constants.RPC_ENCODING_ZLIB_BASE64,
413
            base64.b64encode(zlib.compress(data, 3)))
414

    
415
  #
416
  # Begin RPC calls
417
  #
418

    
419
  def call_lv_list(self, node_list, vg_name):
420
    """Gets the logical volumes present in a given volume group.
421

422
    This is a multi-node call.
423

424
    """
425
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
426

    
427
  def call_vg_list(self, node_list):
428
    """Gets the volume group list.
429

430
    This is a multi-node call.
431

432
    """
433
    return self._MultiNodeCall(node_list, "vg_list", [])
434

    
435
  def call_bridges_exist(self, node, bridges_list):
436
    """Checks if a node has all the bridges given.
437

438
    This method checks if all bridges given in the bridges_list are
439
    present on the remote node, so that an instance that uses interfaces
440
    on those bridges can be started.
441

442
    This is a single-node call.
443

444
    """
445
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
446

    
447
  def call_instance_start(self, node, instance, hvp, bep):
448
    """Starts an instance.
449

450
    This is a single-node call.
451

452
    """
453
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
454
    return self._SingleNodeCall(node, "instance_start", [idict])
455

    
456
  def call_instance_shutdown(self, node, instance):
457
    """Stops an instance.
458

459
    This is a single-node call.
460

461
    """
462
    return self._SingleNodeCall(node, "instance_shutdown",
463
                                [self._InstDict(instance)])
464

    
465
  def call_migration_info(self, node, instance):
466
    """Gather the information necessary to prepare an instance migration.
467

468
    This is a single-node call.
469

470
    @type node: string
471
    @param node: the node on which the instance is currently running
472
    @type instance: C{objects.Instance}
473
    @param instance: the instance definition
474

475
    """
476
    return self._SingleNodeCall(node, "migration_info",
477
                                [self._InstDict(instance)])
478

    
479
  def call_accept_instance(self, node, instance, info, target):
480
    """Prepare a node to accept an instance.
481

482
    This is a single-node call.
483

484
    @type node: string
485
    @param node: the target node for the migration
486
    @type instance: C{objects.Instance}
487
    @param instance: the instance definition
488
    @type info: opaque/hypervisor specific (string/data)
489
    @param info: result for the call_migration_info call
490
    @type target: string
491
    @param target: target hostname (usually ip address) (on the node itself)
492

493
    """
494
    return self._SingleNodeCall(node, "accept_instance",
495
                                [self._InstDict(instance), info, target])
496

    
497
  def call_finalize_migration(self, node, instance, info, success):
498
    """Finalize any target-node migration specific operation.
499

500
    This is called both in case of a successful migration and in case of error
501
    (in which case it should abort the migration).
502

503
    This is a single-node call.
504

505
    @type node: string
506
    @param node: the target node for the migration
507
    @type instance: C{objects.Instance}
508
    @param instance: the instance definition
509
    @type info: opaque/hypervisor specific (string/data)
510
    @param info: result for the call_migration_info call
511
    @type success: boolean
512
    @param success: whether the migration was a success or a failure
513

514
    """
515
    return self._SingleNodeCall(node, "finalize_migration",
516
                                [self._InstDict(instance), info, success])
517

    
518
  def call_instance_migrate(self, node, instance, target, live):
519
    """Migrate an instance.
520

521
    This is a single-node call.
522

523
    @type node: string
524
    @param node: the node on which the instance is currently running
525
    @type instance: C{objects.Instance}
526
    @param instance: the instance definition
527
    @type target: string
528
    @param target: the target node name
529
    @type live: boolean
530
    @param live: whether the migration should be done live or not (the
531
        interpretation of this parameter is left to the hypervisor)
532

533
    """
534
    return self._SingleNodeCall(node, "instance_migrate",
535
                                [self._InstDict(instance), target, live])
536

    
537
  def call_instance_reboot(self, node, instance, reboot_type):
538
    """Reboots an instance.
539

540
    This is a single-node call.
541

542
    """
543
    return self._SingleNodeCall(node, "instance_reboot",
544
                                [self._InstDict(instance), reboot_type])
545

    
546
  def call_instance_os_add(self, node, inst, reinstall):
547
    """Installs an OS on the given instance.
548

549
    This is a single-node call.
550

551
    """
552
    return self._SingleNodeCall(node, "instance_os_add",
553
                                [self._InstDict(inst), reinstall])
554

    
555
  def call_instance_run_rename(self, node, inst, old_name):
556
    """Run the OS rename script for an instance.
557

558
    This is a single-node call.
559

560
    """
561
    return self._SingleNodeCall(node, "instance_run_rename",
562
                                [self._InstDict(inst), old_name])
563

    
564
  def call_instance_info(self, node, instance, hname):
565
    """Returns information about a single instance.
566

567
    This is a single-node call.
568

569
    @type node: list
570
    @param node: the list of nodes to query
571
    @type instance: string
572
    @param instance: the instance name
573
    @type hname: string
574
    @param hname: the hypervisor type of the instance
575

576
    """
577
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
578

    
579
  def call_instance_migratable(self, node, instance):
580
    """Checks whether the given instance can be migrated.
581

582
    This is a single-node call.
583

584
    @param node: the node to query
585
    @type instance: L{objects.Instance}
586
    @param instance: the instance to check
587

588

589
    """
590
    return self._SingleNodeCall(node, "instance_migratable",
591
                                [self._InstDict(instance)])
592

    
593
  def call_all_instances_info(self, node_list, hypervisor_list):
594
    """Returns information about all instances on the given nodes.
595

596
    This is a multi-node call.
597

598
    @type node_list: list
599
    @param node_list: the list of nodes to query
600
    @type hypervisor_list: list
601
    @param hypervisor_list: the hypervisors to query for instances
602

603
    """
604
    return self._MultiNodeCall(node_list, "all_instances_info",
605
                               [hypervisor_list])
606

    
607
  def call_instance_list(self, node_list, hypervisor_list):
608
    """Returns the list of running instances on a given node.
609

610
    This is a multi-node call.
611

612
    @type node_list: list
613
    @param node_list: the list of nodes to query
614
    @type hypervisor_list: list
615
    @param hypervisor_list: the hypervisors to query for instances
616

617
    """
618
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
619

    
620
  def call_node_tcp_ping(self, node, source, target, port, timeout,
621
                         live_port_needed):
622
    """Do a TcpPing on the remote node
623

624
    This is a single-node call.
625

626
    """
627
    return self._SingleNodeCall(node, "node_tcp_ping",
628
                                [source, target, port, timeout,
629
                                 live_port_needed])
630

    
631
  def call_node_has_ip_address(self, node, address):
632
    """Checks if a node has the given IP address.
633

634
    This is a single-node call.
635

636
    """
637
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
638

    
639
  def call_node_info(self, node_list, vg_name, hypervisor_type):
640
    """Return node information.
641

642
    This will return memory information and volume group size and free
643
    space.
644

645
    This is a multi-node call.
646

647
    @type node_list: list
648
    @param node_list: the list of nodes to query
649
    @type vg_name: C{string}
650
    @param vg_name: the name of the volume group to ask for disk space
651
        information
652
    @type hypervisor_type: C{str}
653
    @param hypervisor_type: the name of the hypervisor to ask for
654
        memory information
655

656
    """
657
    return self._MultiNodeCall(node_list, "node_info",
658
                               [vg_name, hypervisor_type])
659

    
660
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
661
    """Add a node to the cluster.
662

663
    This is a single-node call.
664

665
    """
666
    return self._SingleNodeCall(node, "node_add",
667
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
668

    
669
  def call_node_verify(self, node_list, checkdict, cluster_name):
670
    """Request verification of given parameters.
671

672
    This is a multi-node call.
673

674
    """
675
    return self._MultiNodeCall(node_list, "node_verify",
676
                               [checkdict, cluster_name])
677

    
678
  @classmethod
679
  def call_node_start_master(cls, node, start_daemons, no_voting):
680
    """Tells a node to activate itself as a master.
681

682
    This is a single-node call.
683

684
    """
685
    return cls._StaticSingleNodeCall(node, "node_start_master",
686
                                     [start_daemons, no_voting])
687

    
688
  @classmethod
689
  def call_node_stop_master(cls, node, stop_daemons):
690
    """Tells a node to demote itself from master status.
691

692
    This is a single-node call.
693

694
    """
695
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
696

    
697
  @classmethod
698
  def call_master_info(cls, node_list):
699
    """Query master info.
700

701
    This is a multi-node call.
702

703
    """
704
    # TODO: should this method query down nodes?
705
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
706

    
707
  def call_version(self, node_list):
708
    """Query node version.
709

710
    This is a multi-node call.
711

712
    """
713
    return self._MultiNodeCall(node_list, "version", [])
714

    
715
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
716
    """Request creation of a given block device.
717

718
    This is a single-node call.
719

720
    """
721
    return self._SingleNodeCall(node, "blockdev_create",
722
                                [bdev.ToDict(), size, owner, on_primary, info])
723

    
724
  def call_blockdev_remove(self, node, bdev):
725
    """Request removal of a given block device.
726

727
    This is a single-node call.
728

729
    """
730
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
731

    
732
  def call_blockdev_rename(self, node, devlist):
733
    """Request rename of the given block devices.
734

735
    This is a single-node call.
736

737
    """
738
    return self._SingleNodeCall(node, "blockdev_rename",
739
                                [(d.ToDict(), uid) for d, uid in devlist])
740

    
741
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
742
    """Request assembling of a given block device.
743

744
    This is a single-node call.
745

746
    """
747
    return self._SingleNodeCall(node, "blockdev_assemble",
748
                                [disk.ToDict(), owner, on_primary])
749

    
750
  def call_blockdev_shutdown(self, node, disk):
751
    """Request shutdown of a given block device.
752

753
    This is a single-node call.
754

755
    """
756
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
757

    
758
  def call_blockdev_addchildren(self, node, bdev, ndevs):
759
    """Request adding a list of children to a (mirroring) device.
760

761
    This is a single-node call.
762

763
    """
764
    return self._SingleNodeCall(node, "blockdev_addchildren",
765
                                [bdev.ToDict(),
766
                                 [disk.ToDict() for disk in ndevs]])
767

    
768
  def call_blockdev_removechildren(self, node, bdev, ndevs):
769
    """Request removing a list of children from a (mirroring) device.
770

771
    This is a single-node call.
772

773
    """
774
    return self._SingleNodeCall(node, "blockdev_removechildren",
775
                                [bdev.ToDict(),
776
                                 [disk.ToDict() for disk in ndevs]])
777

    
778
  def call_blockdev_getmirrorstatus(self, node, disks):
779
    """Request status of a (mirroring) device.
780

781
    This is a single-node call.
782

783
    """
784
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
785
                                [dsk.ToDict() for dsk in disks])
786

    
787
  def call_blockdev_find(self, node, disk):
788
    """Request identification of a given block device.
789

790
    This is a single-node call.
791

792
    """
793
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
794

    
795
  def call_blockdev_close(self, node, instance_name, disks):
796
    """Closes the given block devices.
797

798
    This is a single-node call.
799

800
    """
801
    params = [instance_name, [cf.ToDict() for cf in disks]]
802
    return self._SingleNodeCall(node, "blockdev_close", params)
803

    
804
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
805
    """Disconnects the network of the given drbd devices.
806

807
    This is a multi-node call.
808

809
    """
810
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
811
                               [nodes_ip, [cf.ToDict() for cf in disks]])
812

    
813
  def call_drbd_attach_net(self, node_list, nodes_ip,
814
                           disks, instance_name, multimaster):
815
    """Disconnects the given drbd devices.
816

817
    This is a multi-node call.
818

819
    """
820
    return self._MultiNodeCall(node_list, "drbd_attach_net",
821
                               [nodes_ip, [cf.ToDict() for cf in disks],
822
                                instance_name, multimaster])
823

    
824
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
825
    """Waits for the synchronization of drbd devices is complete.
826

827
    This is a multi-node call.
828

829
    """
830
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
831
                               [nodes_ip, [cf.ToDict() for cf in disks]])
832

    
833
  @classmethod
834
  def call_upload_file(cls, node_list, file_name, address_list=None):
835
    """Upload a file.
836

837
    The node will refuse the operation in case the file is not on the
838
    approved file list.
839

840
    This is a multi-node call.
841

842
    @type node_list: list
843
    @param node_list: the list of node names to upload to
844
    @type file_name: str
845
    @param file_name: the filename to upload
846
    @type address_list: list or None
847
    @keyword address_list: an optional list of node addresses, in order
848
        to optimize the RPC speed
849

850
    """
851
    file_contents = utils.ReadFile(file_name)
852
    data = cls._Compress(file_contents)
853
    st = os.stat(file_name)
854
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
855
              st.st_atime, st.st_mtime]
856
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
857
                                    address_list=address_list)
858

    
859
  @classmethod
860
  def call_write_ssconf_files(cls, node_list, values):
861
    """Write ssconf files.
862

863
    This is a multi-node call.
864

865
    """
866
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
867

    
868
  def call_os_diagnose(self, node_list):
869
    """Request a diagnose of OS definitions.
870

871
    This is a multi-node call.
872

873
    """
874
    return self._MultiNodeCall(node_list, "os_diagnose", [])
875

    
876
  def call_os_get(self, node, name):
877
    """Returns an OS definition.
878

879
    This is a single-node call.
880

881
    """
882
    result = self._SingleNodeCall(node, "os_get", [name])
883
    if not result.failed and isinstance(result.data, dict):
884
      result.data = objects.OS.FromDict(result.data)
885
    return result
886

    
887
  def call_hooks_runner(self, node_list, hpath, phase, env):
888
    """Call the hooks runner.
889

890
    Args:
891
      - op: the OpCode instance
892
      - env: a dictionary with the environment
893

894
    This is a multi-node call.
895

896
    """
897
    params = [hpath, phase, env]
898
    return self._MultiNodeCall(node_list, "hooks_runner", params)
899

    
900
  def call_iallocator_runner(self, node, name, idata):
901
    """Call an iallocator on a remote node
902

903
    Args:
904
      - name: the iallocator name
905
      - input: the json-encoded input string
906

907
    This is a single-node call.
908

909
    """
910
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
911

    
912
  def call_blockdev_grow(self, node, cf_bdev, amount):
913
    """Request a snapshot of the given block device.
914

915
    This is a single-node call.
916

917
    """
918
    return self._SingleNodeCall(node, "blockdev_grow",
919
                                [cf_bdev.ToDict(), amount])
920

    
921
  def call_blockdev_snapshot(self, node, cf_bdev):
922
    """Request a snapshot of the given block device.
923

924
    This is a single-node call.
925

926
    """
927
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
928

    
929
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
930
                           cluster_name, idx):
931
    """Request the export of a given snapshot.
932

933
    This is a single-node call.
934

935
    """
936
    return self._SingleNodeCall(node, "snapshot_export",
937
                                [snap_bdev.ToDict(), dest_node,
938
                                 self._InstDict(instance), cluster_name, idx])
939

    
940
  def call_finalize_export(self, node, instance, snap_disks):
941
    """Request the completion of an export operation.
942

943
    This writes the export config file, etc.
944

945
    This is a single-node call.
946

947
    """
948
    flat_disks = []
949
    for disk in snap_disks:
950
      if isinstance(disk, bool):
951
        flat_disks.append(disk)
952
      else:
953
        flat_disks.append(disk.ToDict())
954

    
955
    return self._SingleNodeCall(node, "finalize_export",
956
                                [self._InstDict(instance), flat_disks])
957

    
958
  def call_export_info(self, node, path):
959
    """Queries the export information in a given path.
960

961
    This is a single-node call.
962

963
    """
964
    return self._SingleNodeCall(node, "export_info", [path])
965

    
966
  def call_instance_os_import(self, node, inst, src_node, src_images,
967
                              cluster_name):
968
    """Request the import of a backup into an instance.
969

970
    This is a single-node call.
971

972
    """
973
    return self._SingleNodeCall(node, "instance_os_import",
974
                                [self._InstDict(inst), src_node, src_images,
975
                                 cluster_name])
976

    
977
  def call_export_list(self, node_list):
978
    """Gets the stored exports list.
979

980
    This is a multi-node call.
981

982
    """
983
    return self._MultiNodeCall(node_list, "export_list", [])
984

    
985
  def call_export_remove(self, node, export):
986
    """Requests removal of a given export.
987

988
    This is a single-node call.
989

990
    """
991
    return self._SingleNodeCall(node, "export_remove", [export])
992

    
993
  @classmethod
994
  def call_node_leave_cluster(cls, node):
995
    """Requests a node to clean the cluster information it has.
996

997
    This will remove the configuration information from the ganeti data
998
    dir.
999

1000
    This is a single-node call.
1001

1002
    """
1003
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1004

    
1005
  def call_node_volumes(self, node_list):
1006
    """Gets all volumes on node(s).
1007

1008
    This is a multi-node call.
1009

1010
    """
1011
    return self._MultiNodeCall(node_list, "node_volumes", [])
1012

    
1013
  def call_node_demote_from_mc(self, node):
1014
    """Demote a node from the master candidate role.
1015

1016
    This is a single-node call.
1017

1018
    """
1019
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1020

    
1021

    
1022
  def call_node_powercycle(self, node, hypervisor):
1023
    """Tries to powercycle a node.
1024

1025
    This is a single-node call.
1026

1027
    """
1028
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1029

    
1030

    
1031
  def call_test_delay(self, node_list, duration):
1032
    """Sleep for a fixed time on given node(s).
1033

1034
    This is a multi-node call.
1035

1036
    """
1037
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1038

    
1039
  def call_file_storage_dir_create(self, node, file_storage_dir):
1040
    """Create the given file storage directory.
1041

1042
    This is a single-node call.
1043

1044
    """
1045
    return self._SingleNodeCall(node, "file_storage_dir_create",
1046
                                [file_storage_dir])
1047

    
1048
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1049
    """Remove the given file storage directory.
1050

1051
    This is a single-node call.
1052

1053
    """
1054
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1055
                                [file_storage_dir])
1056

    
1057
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1058
                                   new_file_storage_dir):
1059
    """Rename file storage directory.
1060

1061
    This is a single-node call.
1062

1063
    """
1064
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1065
                                [old_file_storage_dir, new_file_storage_dir])
1066

    
1067
  @classmethod
1068
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1069
    """Update job queue.
1070

1071
    This is a multi-node call.
1072

1073
    """
1074
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1075
                                    [file_name, cls._Compress(content)],
1076
                                    address_list=address_list)
1077

    
1078
  @classmethod
1079
  def call_jobqueue_purge(cls, node):
1080
    """Purge job queue.
1081

1082
    This is a single-node call.
1083

1084
    """
1085
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1086

    
1087
  @classmethod
1088
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1089
    """Rename a job queue file.
1090

1091
    This is a multi-node call.
1092

1093
    """
1094
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1095
                                    address_list=address_list)
1096

    
1097
  @classmethod
1098
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1099
    """Set the drain flag on the queue.
1100

1101
    This is a multi-node call.
1102

1103
    @type node_list: list
1104
    @param node_list: the list of nodes to query
1105
    @type drain_flag: bool
1106
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1107

1108
    """
1109
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1110
                                    [drain_flag])
1111

    
1112
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1113
    """Validate the hypervisor params.
1114

1115
    This is a multi-node call.
1116

1117
    @type node_list: list
1118
    @param node_list: the list of nodes to query
1119
    @type hvname: string
1120
    @param hvname: the hypervisor name
1121
    @type hvparams: dict
1122
    @param hvparams: the hypervisor parameters to be validated
1123

1124
    """
1125
    cluster = self._cfg.GetClusterInfo()
1126
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1127
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1128
                               [hvname, hv_full])