Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 728489a3

History | View | Annotate | Download (35.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
# pylint has a bug here, doesn't see this import
46
import ganeti.http.client  # pylint: disable-msg=W0611
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager # pylint: disable-msg=W0603
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  http.InitSsl()
64

    
65
  _http_manager = http.client.HttpClientManager()
66

    
67

    
68
def Shutdown():
69
  """Stops the module-global HTTP client manager.
70

71
  Must be called before quitting the program.
72

73
  """
74
  global _http_manager # pylint: disable-msg=W0603
75

    
76
  if _http_manager:
77
    _http_manager.Shutdown()
78
    _http_manager = None
79

    
80

    
81
class RpcResult(object):
82
  """RPC Result class.
83

84
  This class holds an RPC result. It is needed since in multi-node
85
  calls we can't raise an exception just because one one out of many
86
  failed, and therefore we use this class to encapsulate the result.
87

88
  @ivar data: the data payload, for successful results, or None
89
  @ivar call: the name of the RPC call
90
  @ivar node: the name of the node to which we made the call
91
  @ivar offline: whether the operation failed because the node was
92
      offline, as opposed to actual failure; offline=True will always
93
      imply failed=True, in order to allow simpler checking if
94
      the user doesn't care about the exact failure mode
95
  @ivar fail_msg: the error message if the call failed
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.offline = offline
101
    self.call = call
102
    self.node = node
103

    
104
    if offline:
105
      self.fail_msg = "Node is marked offline"
106
      self.data = self.payload = None
107
    elif failed:
108
      self.fail_msg = self._EnsureErr(data)
109
      self.data = self.payload = None
110
    else:
111
      self.data = data
112
      if not isinstance(self.data, (tuple, list)):
113
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
114
                         type(self.data))
115
        self.payload = None
116
      elif len(data) != 2:
117
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
118
                         "expected 2" % len(self.data))
119
        self.payload = None
120
      elif not self.data[0]:
121
        self.fail_msg = self._EnsureErr(self.data[1])
122
        self.payload = None
123
      else:
124
        # finally success
125
        self.fail_msg = None
126
        self.payload = data[1]
127

    
128
    assert hasattr(self, "call")
129
    assert hasattr(self, "data")
130
    assert hasattr(self, "fail_msg")
131
    assert hasattr(self, "node")
132
    assert hasattr(self, "offline")
133
    assert hasattr(self, "payload")
134

    
135
  @staticmethod
136
  def _EnsureErr(val):
137
    """Helper to ensure we return a 'True' value for error."""
138
    if val:
139
      return val
140
    else:
141
      return "No error information"
142

    
143
  def Raise(self, msg, prereq=False, ecode=None):
144
    """If the result has failed, raise an OpExecError.
145

146
    This is used so that LU code doesn't have to check for each
147
    result, but instead can call this function.
148

149
    """
150
    if not self.fail_msg:
151
      return
152

    
153
    if not msg: # one could pass None for default message
154
      msg = ("Call '%s' to node '%s' has failed: %s" %
155
             (self.call, self.node, self.fail_msg))
156
    else:
157
      msg = "%s: %s" % (msg, self.fail_msg)
158
    if prereq:
159
      ec = errors.OpPrereqError
160
    else:
161
      ec = errors.OpExecError
162
    if ecode is not None:
163
      args = (msg, prereq)
164
    else:
165
      args = (msg, )
166
    raise ec(*args) # pylint: disable-msg=W0142
167

    
168

    
169
class Client:
170
  """RPC Client class.
171

172
  This class, given a (remote) method name, a list of parameters and a
173
  list of nodes, will contact (in parallel) all nodes, and return a
174
  dict of results (key: node name, value: result).
175

176
  One current bug is that generic failure is still signaled by
177
  'False' result, which is not good. This overloading of values can
178
  cause bugs.
179

180
  """
181
  def __init__(self, procedure, body, port):
182
    self.procedure = procedure
183
    self.body = body
184
    self.port = port
185
    self.nc = {}
186

    
187
    self._ssl_params = \
188
      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189
                         ssl_cert_path=constants.NODED_CERT_FILE)
190

    
191
  def ConnectList(self, node_list, address_list=None):
192
    """Add a list of nodes to the target nodes.
193

194
    @type node_list: list
195
    @param node_list: the list of node names to connect
196
    @type address_list: list or None
197
    @keyword address_list: either None or a list with node addresses,
198
        which must have the same length as the node list
199

200
    """
201
    if address_list is None:
202
      address_list = [None for _ in node_list]
203
    else:
204
      assert len(node_list) == len(address_list), \
205
             "Name and address lists should have the same length"
206
    for node, address in zip(node_list, address_list):
207
      self.ConnectNode(node, address)
208

    
209
  def ConnectNode(self, name, address=None):
210
    """Add a node to the target list.
211

212
    @type name: str
213
    @param name: the node name
214
    @type address: str
215
    @keyword address: the node address, if known
216

217
    """
218
    if address is None:
219
      address = name
220

    
221
    self.nc[name] = \
222
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
223
                                    "/%s" % self.procedure,
224
                                    post_data=self.body,
225
                                    ssl_params=self._ssl_params,
226
                                    ssl_verify_peer=True)
227

    
228
  def GetResults(self):
229
    """Call nodes and return results.
230

231
    @rtype: list
232
    @return: List of RPC results
233

234
    """
235
    assert _http_manager, "RPC module not initialized"
236

    
237
    _http_manager.ExecRequests(self.nc.values())
238

    
239
    results = {}
240

    
241
    for name, req in self.nc.iteritems():
242
      if req.success and req.resp_status_code == http.HTTP_OK:
243
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
244
                                  node=name, call=self.procedure)
245
        continue
246

    
247
      # TODO: Better error reporting
248
      if req.error:
249
        msg = req.error
250
      else:
251
        msg = req.resp_body
252

    
253
      logging.error("RPC error in %s from node %s: %s",
254
                    self.procedure, name, msg)
255
      results[name] = RpcResult(data=msg, failed=True, node=name,
256
                                call=self.procedure)
257

    
258
    return results
259

    
260

    
261
class RpcRunner(object):
262
  """RPC runner class"""
263

    
264
  def __init__(self, cfg):
265
    """Initialized the rpc runner.
266

267
    @type cfg:  C{config.ConfigWriter}
268
    @param cfg: the configuration object that will be used to get data
269
                about the cluster
270

271
    """
272
    self._cfg = cfg
273
    self.port = utils.GetDaemonPort(constants.NODED)
274

    
275
  def _InstDict(self, instance, hvp=None, bep=None):
276
    """Convert the given instance to a dict.
277

278
    This is done via the instance's ToDict() method and additionally
279
    we fill the hvparams with the cluster defaults.
280

281
    @type instance: L{objects.Instance}
282
    @param instance: an Instance object
283
    @type hvp: dict or None
284
    @param hvp: a dictionary with overridden hypervisor parameters
285
    @type bep: dict or None
286
    @param bep: a dictionary with overridden backend parameters
287
    @rtype: dict
288
    @return: the instance dict, with the hvparams filled with the
289
        cluster defaults
290

291
    """
292
    idict = instance.ToDict()
293
    cluster = self._cfg.GetClusterInfo()
294
    idict["hvparams"] = cluster.FillHV(instance)
295
    if hvp is not None:
296
      idict["hvparams"].update(hvp)
297
    idict["beparams"] = cluster.FillBE(instance)
298
    if bep is not None:
299
      idict["beparams"].update(bep)
300
    for nic in idict["nics"]:
301
      nic['nicparams'] = objects.FillDict(
302
        cluster.nicparams[constants.PP_DEFAULT],
303
        nic['nicparams'])
304
    return idict
305

    
306
  def _ConnectList(self, client, node_list, call):
307
    """Helper for computing node addresses.
308

309
    @type client: L{ganeti.rpc.Client}
310
    @param client: a C{Client} instance
311
    @type node_list: list
312
    @param node_list: the node list we should connect
313
    @type call: string
314
    @param call: the name of the remote procedure call, for filling in
315
        correctly any eventual offline nodes' results
316

317
    """
318
    all_nodes = self._cfg.GetAllNodesInfo()
319
    name_list = []
320
    addr_list = []
321
    skip_dict = {}
322
    for node in node_list:
323
      if node in all_nodes:
324
        if all_nodes[node].offline:
325
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
326
          continue
327
        val = all_nodes[node].primary_ip
328
      else:
329
        val = None
330
      addr_list.append(val)
331
      name_list.append(node)
332
    if name_list:
333
      client.ConnectList(name_list, address_list=addr_list)
334
    return skip_dict
335

    
336
  def _ConnectNode(self, client, node, call):
337
    """Helper for computing one node's address.
338

339
    @type client: L{ganeti.rpc.Client}
340
    @param client: a C{Client} instance
341
    @type node: str
342
    @param node: the node we should connect
343
    @type call: string
344
    @param call: the name of the remote procedure call, for filling in
345
        correctly any eventual offline nodes' results
346

347
    """
348
    node_info = self._cfg.GetNodeInfo(node)
349
    if node_info is not None:
350
      if node_info.offline:
351
        return RpcResult(node=node, offline=True, call=call)
352
      addr = node_info.primary_ip
353
    else:
354
      addr = None
355
    client.ConnectNode(node, address=addr)
356

    
357
  def _MultiNodeCall(self, node_list, procedure, args):
358
    """Helper for making a multi-node call
359

360
    """
361
    body = serializer.DumpJson(args, indent=False)
362
    c = Client(procedure, body, self.port)
363
    skip_dict = self._ConnectList(c, node_list, procedure)
364
    skip_dict.update(c.GetResults())
365
    return skip_dict
366

    
367
  @classmethod
368
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
369
                           address_list=None):
370
    """Helper for making a multi-node static call
371

372
    """
373
    body = serializer.DumpJson(args, indent=False)
374
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
375
    c.ConnectList(node_list, address_list=address_list)
376
    return c.GetResults()
377

    
378
  def _SingleNodeCall(self, node, procedure, args):
379
    """Helper for making a single-node call
380

381
    """
382
    body = serializer.DumpJson(args, indent=False)
383
    c = Client(procedure, body, self.port)
384
    result = self._ConnectNode(c, node, procedure)
385
    if result is None:
386
      # we did connect, node is not offline
387
      result = c.GetResults()[node]
388
    return result
389

    
390
  @classmethod
391
  def _StaticSingleNodeCall(cls, node, procedure, args):
392
    """Helper for making a single-node static call
393

394
    """
395
    body = serializer.DumpJson(args, indent=False)
396
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
397
    c.ConnectNode(node)
398
    return c.GetResults()[node]
399

    
400
  @staticmethod
401
  def _Compress(data):
402
    """Compresses a string for transport over RPC.
403

404
    Small amounts of data are not compressed.
405

406
    @type data: str
407
    @param data: Data
408
    @rtype: tuple
409
    @return: Encoded data to send
410

411
    """
412
    # Small amounts of data are not compressed
413
    if len(data) < 512:
414
      return (constants.RPC_ENCODING_NONE, data)
415

    
416
    # Compress with zlib and encode in base64
417
    return (constants.RPC_ENCODING_ZLIB_BASE64,
418
            base64.b64encode(zlib.compress(data, 3)))
419

    
420
  #
421
  # Begin RPC calls
422
  #
423

    
424
  def call_lv_list(self, node_list, vg_name):
425
    """Gets the logical volumes present in a given volume group.
426

427
    This is a multi-node call.
428

429
    """
430
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
431

    
432
  def call_vg_list(self, node_list):
433
    """Gets the volume group list.
434

435
    This is a multi-node call.
436

437
    """
438
    return self._MultiNodeCall(node_list, "vg_list", [])
439

    
440
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
441
    """Get list of storage units.
442

443
    This is a multi-node call.
444

445
    """
446
    return self._MultiNodeCall(node_list, "storage_list",
447
                               [su_name, su_args, name, fields])
448

    
449
  def call_storage_modify(self, node, su_name, su_args, name, changes):
450
    """Modify a storage unit.
451

452
    This is a single-node call.
453

454
    """
455
    return self._SingleNodeCall(node, "storage_modify",
456
                                [su_name, su_args, name, changes])
457

    
458
  def call_storage_execute(self, node, su_name, su_args, name, op):
459
    """Executes an operation on a storage unit.
460

461
    This is a single-node call.
462

463
    """
464
    return self._SingleNodeCall(node, "storage_execute",
465
                                [su_name, su_args, name, op])
466

    
467
  def call_bridges_exist(self, node, bridges_list):
468
    """Checks if a node has all the bridges given.
469

470
    This method checks if all bridges given in the bridges_list are
471
    present on the remote node, so that an instance that uses interfaces
472
    on those bridges can be started.
473

474
    This is a single-node call.
475

476
    """
477
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
478

    
479
  def call_instance_start(self, node, instance, hvp, bep):
480
    """Starts an instance.
481

482
    This is a single-node call.
483

484
    """
485
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
486
    return self._SingleNodeCall(node, "instance_start", [idict])
487

    
488
  def call_instance_shutdown(self, node, instance, timeout):
489
    """Stops an instance.
490

491
    This is a single-node call.
492

493
    """
494
    return self._SingleNodeCall(node, "instance_shutdown",
495
                                [self._InstDict(instance), timeout])
496

    
497
  def call_migration_info(self, node, instance):
498
    """Gather the information necessary to prepare an instance migration.
499

500
    This is a single-node call.
501

502
    @type node: string
503
    @param node: the node on which the instance is currently running
504
    @type instance: C{objects.Instance}
505
    @param instance: the instance definition
506

507
    """
508
    return self._SingleNodeCall(node, "migration_info",
509
                                [self._InstDict(instance)])
510

    
511
  def call_accept_instance(self, node, instance, info, target):
512
    """Prepare a node to accept an instance.
513

514
    This is a single-node call.
515

516
    @type node: string
517
    @param node: the target node for the migration
518
    @type instance: C{objects.Instance}
519
    @param instance: the instance definition
520
    @type info: opaque/hypervisor specific (string/data)
521
    @param info: result for the call_migration_info call
522
    @type target: string
523
    @param target: target hostname (usually ip address) (on the node itself)
524

525
    """
526
    return self._SingleNodeCall(node, "accept_instance",
527
                                [self._InstDict(instance), info, target])
528

    
529
  def call_finalize_migration(self, node, instance, info, success):
530
    """Finalize any target-node migration specific operation.
531

532
    This is called both in case of a successful migration and in case of error
533
    (in which case it should abort the migration).
534

535
    This is a single-node call.
536

537
    @type node: string
538
    @param node: the target node for the migration
539
    @type instance: C{objects.Instance}
540
    @param instance: the instance definition
541
    @type info: opaque/hypervisor specific (string/data)
542
    @param info: result for the call_migration_info call
543
    @type success: boolean
544
    @param success: whether the migration was a success or a failure
545

546
    """
547
    return self._SingleNodeCall(node, "finalize_migration",
548
                                [self._InstDict(instance), info, success])
549

    
550
  def call_instance_migrate(self, node, instance, target, live):
551
    """Migrate an instance.
552

553
    This is a single-node call.
554

555
    @type node: string
556
    @param node: the node on which the instance is currently running
557
    @type instance: C{objects.Instance}
558
    @param instance: the instance definition
559
    @type target: string
560
    @param target: the target node name
561
    @type live: boolean
562
    @param live: whether the migration should be done live or not (the
563
        interpretation of this parameter is left to the hypervisor)
564

565
    """
566
    return self._SingleNodeCall(node, "instance_migrate",
567
                                [self._InstDict(instance), target, live])
568

    
569
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
570
    """Reboots an instance.
571

572
    This is a single-node call.
573

574
    """
575
    return self._SingleNodeCall(node, "instance_reboot",
576
                                [self._InstDict(inst), reboot_type,
577
                                 shutdown_timeout])
578

    
579
  def call_instance_os_add(self, node, inst, reinstall, debug):
580
    """Installs an OS on the given instance.
581

582
    This is a single-node call.
583

584
    """
585
    return self._SingleNodeCall(node, "instance_os_add",
586
                                [self._InstDict(inst), reinstall, debug])
587

    
588
  def call_instance_run_rename(self, node, inst, old_name, debug):
589
    """Run the OS rename script for an instance.
590

591
    This is a single-node call.
592

593
    """
594
    return self._SingleNodeCall(node, "instance_run_rename",
595
                                [self._InstDict(inst), old_name, debug])
596

    
597
  def call_instance_info(self, node, instance, hname):
598
    """Returns information about a single instance.
599

600
    This is a single-node call.
601

602
    @type node: list
603
    @param node: the list of nodes to query
604
    @type instance: string
605
    @param instance: the instance name
606
    @type hname: string
607
    @param hname: the hypervisor type of the instance
608

609
    """
610
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
611

    
612
  def call_instance_migratable(self, node, instance):
613
    """Checks whether the given instance can be migrated.
614

615
    This is a single-node call.
616

617
    @param node: the node to query
618
    @type instance: L{objects.Instance}
619
    @param instance: the instance to check
620

621

622
    """
623
    return self._SingleNodeCall(node, "instance_migratable",
624
                                [self._InstDict(instance)])
625

    
626
  def call_all_instances_info(self, node_list, hypervisor_list):
627
    """Returns information about all instances on the given nodes.
628

629
    This is a multi-node call.
630

631
    @type node_list: list
632
    @param node_list: the list of nodes to query
633
    @type hypervisor_list: list
634
    @param hypervisor_list: the hypervisors to query for instances
635

636
    """
637
    return self._MultiNodeCall(node_list, "all_instances_info",
638
                               [hypervisor_list])
639

    
640
  def call_instance_list(self, node_list, hypervisor_list):
641
    """Returns the list of running instances on a given node.
642

643
    This is a multi-node call.
644

645
    @type node_list: list
646
    @param node_list: the list of nodes to query
647
    @type hypervisor_list: list
648
    @param hypervisor_list: the hypervisors to query for instances
649

650
    """
651
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
652

    
653
  def call_node_tcp_ping(self, node, source, target, port, timeout,
654
                         live_port_needed):
655
    """Do a TcpPing on the remote node
656

657
    This is a single-node call.
658

659
    """
660
    return self._SingleNodeCall(node, "node_tcp_ping",
661
                                [source, target, port, timeout,
662
                                 live_port_needed])
663

    
664
  def call_node_has_ip_address(self, node, address):
665
    """Checks if a node has the given IP address.
666

667
    This is a single-node call.
668

669
    """
670
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
671

    
672
  def call_node_info(self, node_list, vg_name, hypervisor_type):
673
    """Return node information.
674

675
    This will return memory information and volume group size and free
676
    space.
677

678
    This is a multi-node call.
679

680
    @type node_list: list
681
    @param node_list: the list of nodes to query
682
    @type vg_name: C{string}
683
    @param vg_name: the name of the volume group to ask for disk space
684
        information
685
    @type hypervisor_type: C{str}
686
    @param hypervisor_type: the name of the hypervisor to ask for
687
        memory information
688

689
    """
690
    return self._MultiNodeCall(node_list, "node_info",
691
                               [vg_name, hypervisor_type])
692

    
693
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
694
    """Add a node to the cluster.
695

696
    This is a single-node call.
697

698
    """
699
    return self._SingleNodeCall(node, "node_add",
700
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
701

    
702
  def call_node_verify(self, node_list, checkdict, cluster_name):
703
    """Request verification of given parameters.
704

705
    This is a multi-node call.
706

707
    """
708
    return self._MultiNodeCall(node_list, "node_verify",
709
                               [checkdict, cluster_name])
710

    
711
  @classmethod
712
  def call_node_start_master(cls, node, start_daemons, no_voting):
713
    """Tells a node to activate itself as a master.
714

715
    This is a single-node call.
716

717
    """
718
    return cls._StaticSingleNodeCall(node, "node_start_master",
719
                                     [start_daemons, no_voting])
720

    
721
  @classmethod
722
  def call_node_stop_master(cls, node, stop_daemons):
723
    """Tells a node to demote itself from master status.
724

725
    This is a single-node call.
726

727
    """
728
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
729

    
730
  @classmethod
731
  def call_master_info(cls, node_list):
732
    """Query master info.
733

734
    This is a multi-node call.
735

736
    """
737
    # TODO: should this method query down nodes?
738
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
739

    
740
  @classmethod
741
  def call_version(cls, node_list):
742
    """Query node version.
743

744
    This is a multi-node call.
745

746
    """
747
    return cls._StaticMultiNodeCall(node_list, "version", [])
748

    
749
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
750
    """Request creation of a given block device.
751

752
    This is a single-node call.
753

754
    """
755
    return self._SingleNodeCall(node, "blockdev_create",
756
                                [bdev.ToDict(), size, owner, on_primary, info])
757

    
758
  def call_blockdev_remove(self, node, bdev):
759
    """Request removal of a given block device.
760

761
    This is a single-node call.
762

763
    """
764
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
765

    
766
  def call_blockdev_rename(self, node, devlist):
767
    """Request rename of the given block devices.
768

769
    This is a single-node call.
770

771
    """
772
    return self._SingleNodeCall(node, "blockdev_rename",
773
                                [(d.ToDict(), uid) for d, uid in devlist])
774

    
775
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
776
    """Request assembling of a given block device.
777

778
    This is a single-node call.
779

780
    """
781
    return self._SingleNodeCall(node, "blockdev_assemble",
782
                                [disk.ToDict(), owner, on_primary])
783

    
784
  def call_blockdev_shutdown(self, node, disk):
785
    """Request shutdown of a given block device.
786

787
    This is a single-node call.
788

789
    """
790
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
791

    
792
  def call_blockdev_addchildren(self, node, bdev, ndevs):
793
    """Request adding a list of children to a (mirroring) device.
794

795
    This is a single-node call.
796

797
    """
798
    return self._SingleNodeCall(node, "blockdev_addchildren",
799
                                [bdev.ToDict(),
800
                                 [disk.ToDict() for disk in ndevs]])
801

    
802
  def call_blockdev_removechildren(self, node, bdev, ndevs):
803
    """Request removing a list of children from a (mirroring) device.
804

805
    This is a single-node call.
806

807
    """
808
    return self._SingleNodeCall(node, "blockdev_removechildren",
809
                                [bdev.ToDict(),
810
                                 [disk.ToDict() for disk in ndevs]])
811

    
812
  def call_blockdev_getmirrorstatus(self, node, disks):
813
    """Request status of a (mirroring) device.
814

815
    This is a single-node call.
816

817
    """
818
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
819
                                  [dsk.ToDict() for dsk in disks])
820
    if not result.fail_msg:
821
      result.payload = [objects.BlockDevStatus.FromDict(i)
822
                        for i in result.payload]
823
    return result
824

    
825
  def call_blockdev_find(self, node, disk):
826
    """Request identification of a given block device.
827

828
    This is a single-node call.
829

830
    """
831
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
832
    if not result.fail_msg and result.payload is not None:
833
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
834
    return result
835

    
836
  def call_blockdev_close(self, node, instance_name, disks):
837
    """Closes the given block devices.
838

839
    This is a single-node call.
840

841
    """
842
    params = [instance_name, [cf.ToDict() for cf in disks]]
843
    return self._SingleNodeCall(node, "blockdev_close", params)
844

    
845
  def call_blockdev_getsizes(self, node, disks):
846
    """Returns the size of the given disks.
847

848
    This is a single-node call.
849

850
    """
851
    params = [[cf.ToDict() for cf in disks]]
852
    return self._SingleNodeCall(node, "blockdev_getsize", params)
853

    
854
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
855
    """Disconnects the network of the given drbd devices.
856

857
    This is a multi-node call.
858

859
    """
860
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
861
                               [nodes_ip, [cf.ToDict() for cf in disks]])
862

    
863
  def call_drbd_attach_net(self, node_list, nodes_ip,
864
                           disks, instance_name, multimaster):
865
    """Disconnects the given drbd devices.
866

867
    This is a multi-node call.
868

869
    """
870
    return self._MultiNodeCall(node_list, "drbd_attach_net",
871
                               [nodes_ip, [cf.ToDict() for cf in disks],
872
                                instance_name, multimaster])
873

    
874
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
875
    """Waits for the synchronization of drbd devices is complete.
876

877
    This is a multi-node call.
878

879
    """
880
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
881
                               [nodes_ip, [cf.ToDict() for cf in disks]])
882

    
883
  @classmethod
884
  def call_upload_file(cls, node_list, file_name, address_list=None):
885
    """Upload a file.
886

887
    The node will refuse the operation in case the file is not on the
888
    approved file list.
889

890
    This is a multi-node call.
891

892
    @type node_list: list
893
    @param node_list: the list of node names to upload to
894
    @type file_name: str
895
    @param file_name: the filename to upload
896
    @type address_list: list or None
897
    @keyword address_list: an optional list of node addresses, in order
898
        to optimize the RPC speed
899

900
    """
901
    file_contents = utils.ReadFile(file_name)
902
    data = cls._Compress(file_contents)
903
    st = os.stat(file_name)
904
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
905
              st.st_atime, st.st_mtime]
906
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
907
                                    address_list=address_list)
908

    
909
  @classmethod
910
  def call_write_ssconf_files(cls, node_list, values):
911
    """Write ssconf files.
912

913
    This is a multi-node call.
914

915
    """
916
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
917

    
918
  def call_os_diagnose(self, node_list):
919
    """Request a diagnose of OS definitions.
920

921
    This is a multi-node call.
922

923
    """
924
    return self._MultiNodeCall(node_list, "os_diagnose", [])
925

    
926
  def call_os_get(self, node, name):
927
    """Returns an OS definition.
928

929
    This is a single-node call.
930

931
    """
932
    result = self._SingleNodeCall(node, "os_get", [name])
933
    if not result.fail_msg and isinstance(result.payload, dict):
934
      result.payload = objects.OS.FromDict(result.payload)
935
    return result
936

    
937
  def call_hooks_runner(self, node_list, hpath, phase, env):
938
    """Call the hooks runner.
939

940
    Args:
941
      - op: the OpCode instance
942
      - env: a dictionary with the environment
943

944
    This is a multi-node call.
945

946
    """
947
    params = [hpath, phase, env]
948
    return self._MultiNodeCall(node_list, "hooks_runner", params)
949

    
950
  def call_iallocator_runner(self, node, name, idata):
951
    """Call an iallocator on a remote node
952

953
    Args:
954
      - name: the iallocator name
955
      - input: the json-encoded input string
956

957
    This is a single-node call.
958

959
    """
960
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
961

    
962
  def call_blockdev_grow(self, node, cf_bdev, amount):
963
    """Request a snapshot of the given block device.
964

965
    This is a single-node call.
966

967
    """
968
    return self._SingleNodeCall(node, "blockdev_grow",
969
                                [cf_bdev.ToDict(), amount])
970

    
971
  def call_blockdev_export(self, node, cf_bdev,
972
                           dest_node, dest_path, cluster_name):
973
    """Export a given disk to another node.
974

975
    This is a single-node call.
976

977
    """
978
    return self._SingleNodeCall(node, "blockdev_export",
979
                                [cf_bdev.ToDict(), dest_node, dest_path,
980
                                 cluster_name])
981

    
982
  def call_blockdev_snapshot(self, node, cf_bdev):
983
    """Request a snapshot of the given block device.
984

985
    This is a single-node call.
986

987
    """
988
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
989

    
990
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
991
                           cluster_name, idx, debug):
992
    """Request the export of a given snapshot.
993

994
    This is a single-node call.
995

996
    """
997
    return self._SingleNodeCall(node, "snapshot_export",
998
                                [snap_bdev.ToDict(), dest_node,
999
                                 self._InstDict(instance), cluster_name,
1000
                                 idx, debug])
1001

    
1002
  def call_finalize_export(self, node, instance, snap_disks):
1003
    """Request the completion of an export operation.
1004

1005
    This writes the export config file, etc.
1006

1007
    This is a single-node call.
1008

1009
    """
1010
    flat_disks = []
1011
    for disk in snap_disks:
1012
      if isinstance(disk, bool):
1013
        flat_disks.append(disk)
1014
      else:
1015
        flat_disks.append(disk.ToDict())
1016

    
1017
    return self._SingleNodeCall(node, "finalize_export",
1018
                                [self._InstDict(instance), flat_disks])
1019

    
1020
  def call_export_info(self, node, path):
1021
    """Queries the export information in a given path.
1022

1023
    This is a single-node call.
1024

1025
    """
1026
    return self._SingleNodeCall(node, "export_info", [path])
1027

    
1028
  def call_instance_os_import(self, node, inst, src_node, src_images,
1029
                              cluster_name, debug):
1030
    """Request the import of a backup into an instance.
1031

1032
    This is a single-node call.
1033

1034
    """
1035
    return self._SingleNodeCall(node, "instance_os_import",
1036
                                [self._InstDict(inst), src_node, src_images,
1037
                                 cluster_name, debug])
1038

    
1039
  def call_export_list(self, node_list):
1040
    """Gets the stored exports list.
1041

1042
    This is a multi-node call.
1043

1044
    """
1045
    return self._MultiNodeCall(node_list, "export_list", [])
1046

    
1047
  def call_export_remove(self, node, export):
1048
    """Requests removal of a given export.
1049

1050
    This is a single-node call.
1051

1052
    """
1053
    return self._SingleNodeCall(node, "export_remove", [export])
1054

    
1055
  @classmethod
1056
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1057
    """Requests a node to clean the cluster information it has.
1058

1059
    This will remove the configuration information from the ganeti data
1060
    dir.
1061

1062
    This is a single-node call.
1063

1064
    """
1065
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1066
                                     [modify_ssh_setup])
1067

    
1068
  def call_node_volumes(self, node_list):
1069
    """Gets all volumes on node(s).
1070

1071
    This is a multi-node call.
1072

1073
    """
1074
    return self._MultiNodeCall(node_list, "node_volumes", [])
1075

    
1076
  def call_node_demote_from_mc(self, node):
1077
    """Demote a node from the master candidate role.
1078

1079
    This is a single-node call.
1080

1081
    """
1082
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1083

    
1084

    
1085
  def call_node_powercycle(self, node, hypervisor):
1086
    """Tries to powercycle a node.
1087

1088
    This is a single-node call.
1089

1090
    """
1091
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1092

    
1093

    
1094
  def call_test_delay(self, node_list, duration):
1095
    """Sleep for a fixed time on given node(s).
1096

1097
    This is a multi-node call.
1098

1099
    """
1100
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1101

    
1102
  def call_file_storage_dir_create(self, node, file_storage_dir):
1103
    """Create the given file storage directory.
1104

1105
    This is a single-node call.
1106

1107
    """
1108
    return self._SingleNodeCall(node, "file_storage_dir_create",
1109
                                [file_storage_dir])
1110

    
1111
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1112
    """Remove the given file storage directory.
1113

1114
    This is a single-node call.
1115

1116
    """
1117
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1118
                                [file_storage_dir])
1119

    
1120
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1121
                                   new_file_storage_dir):
1122
    """Rename file storage directory.
1123

1124
    This is a single-node call.
1125

1126
    """
1127
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1128
                                [old_file_storage_dir, new_file_storage_dir])
1129

    
1130
  @classmethod
1131
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1132
    """Update job queue.
1133

1134
    This is a multi-node call.
1135

1136
    """
1137
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1138
                                    [file_name, cls._Compress(content)],
1139
                                    address_list=address_list)
1140

    
1141
  @classmethod
1142
  def call_jobqueue_purge(cls, node):
1143
    """Purge job queue.
1144

1145
    This is a single-node call.
1146

1147
    """
1148
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1149

    
1150
  @classmethod
1151
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1152
    """Rename a job queue file.
1153

1154
    This is a multi-node call.
1155

1156
    """
1157
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1158
                                    address_list=address_list)
1159

    
1160
  @classmethod
1161
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1162
    """Set the drain flag on the queue.
1163

1164
    This is a multi-node call.
1165

1166
    @type node_list: list
1167
    @param node_list: the list of nodes to query
1168
    @type drain_flag: bool
1169
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1170

1171
    """
1172
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1173
                                    [drain_flag])
1174

    
1175
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1176
    """Validate the hypervisor params.
1177

1178
    This is a multi-node call.
1179

1180
    @type node_list: list
1181
    @param node_list: the list of nodes to query
1182
    @type hvname: string
1183
    @param hvname: the hypervisor name
1184
    @type hvparams: dict
1185
    @param hvparams: the hypervisor parameters to be validated
1186

1187
    """
1188
    cluster = self._cfg.GetClusterInfo()
1189
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1190
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1191
                               [hvname, hv_full])